[jira] [Commented] (SPARK-47063) CAST long to timestamp has different behavior for codegen vs interpreted

2024-02-26 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-47063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820877#comment-17820877
 ] 

Robert Joseph Evans commented on SPARK-47063:
-

[~planga82] I was not planning on putting up a patch, but it would be willing 
to if no one else wants to put one up. I would just need to know if we want to 
clamp the result or if we are okay with the overflow.

> CAST long to timestamp has different behavior for codegen vs interpreted
> 
>
> Key: SPARK-47063
> URL: https://issues.apache.org/jira/browse/SPARK-47063
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.4.2
>Reporter: Robert Joseph Evans
>Priority: Major
>
> It probably impacts a lot more versions of the code than this, but I verified 
> it on 3.4.2. This also appears to be related to 
> https://issues.apache.org/jira/browse/SPARK-39209
> {code:java}
> scala> Seq(Long.MaxValue, Long.MinValue, 0L, 1990L).toDF("v").selectExpr("*", 
> "CAST(v AS timestamp) as ts").selectExpr("*", "unix_micros(ts)").show(false)
> ++-++
> |v                   |ts                           |unix_micros(ts)     |
> ++-++
> |9223372036854775807 |+294247-01-10 04:00:54.775807|9223372036854775807 |
> |-9223372036854775808|-290308-12-21 19:59:05.224192|-9223372036854775808|
> |0                   |1970-01-01 00:00:00          |0                   |
> |1990                |1970-01-01 00:33:10          |199000          |
> ++-++
> scala> Seq(Long.MaxValue, Long.MinValue, 0L, 
> 1990L).toDF("v").repartition(1).selectExpr("*", "CAST(v AS timestamp) as 
> ts").selectExpr("*", "unix_micros(ts)").show(false)
> ++---+---+
> |v                   |ts                 |unix_micros(ts)|
> ++---+---+
> |9223372036854775807 |1969-12-31 23:59:59|-100       |
> |-9223372036854775808|1970-01-01 00:00:00|0              |
> |0                   |1970-01-01 00:00:00|0              |
> |1990                |1970-01-01 00:33:10|199000     |
> ++---+---+
> {code}
> It looks like InMemoryTableScanExec is not doing code generation for the 
> expressions, but the ProjectExec after the repartition is.
> If I disable code gen I get the same answer in both cases.
> {code:java}
> scala> spark.conf.set("spark.sql.codegen.wholeStage", false)
> scala> spark.conf.set("spark.sql.codegen.factoryMode", "NO_CODEGEN")
> scala> Seq(Long.MaxValue, Long.MinValue, 0L, 1990L).toDF("v").selectExpr("*", 
> "CAST(v AS timestamp) as ts").selectExpr("*", "unix_micros(ts)").show(false)
> ++-++
> |v                   |ts                           |unix_micros(ts)     |
> ++-++
> |9223372036854775807 |+294247-01-10 04:00:54.775807|9223372036854775807 |
> |-9223372036854775808|-290308-12-21 19:59:05.224192|-9223372036854775808|
> |0                   |1970-01-01 00:00:00          |0                   |
> |1990                |1970-01-01 00:33:10          |199000          |
> ++-++
> scala> Seq(Long.MaxValue, Long.MinValue, 0L, 
> 1990L).toDF("v").repartition(1).selectExpr("*", "CAST(v AS timestamp) as 
> ts").selectExpr("*", "unix_micros(ts)").show(false)
> ++-++
> |v                   |ts                           |unix_micros(ts)     |
> ++-++
> |9223372036854775807 |+294247-01-10 04:00:54.775807|9223372036854775807 |
> |-9223372036854775808|-290308-12-21 19:59:05.224192|-9223372036854775808|
> |0                   |1970-01-01 00:00:00          |0                   |
> |1990                |1970-01-01 00:33:10          |199000          |
> ++-++
> {code}
> [https://github.com/apache/spark/blob/e2cd71a4cd54bbdf5af76d3edfbb2fc8c1b067b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L1627]
> Is the code used in codegen, but
> [https://github.com/apache/spark/blob/e2cd71a4cd54bbdf5af76d3edfbb2fc8c1b067b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L687]
> is what is used outside of code gen.
> Apparently `SECONDS.toMicros` truncates the value on an overflow, but the 
> codegen does not.
> {code:java}
> scala> 

[jira] [Created] (SPARK-47063) CAST long to timestamp has different behavior for codegen vs interpreted

2024-02-15 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-47063:
---

 Summary: CAST long to timestamp has different behavior for codegen 
vs interpreted
 Key: SPARK-47063
 URL: https://issues.apache.org/jira/browse/SPARK-47063
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.4.2
Reporter: Robert Joseph Evans


It probably impacts a lot more versions of the code than this, but I verified 
it on 3.4.2. This also appears to be related to 
https://issues.apache.org/jira/browse/SPARK-39209
{code:java}
scala> Seq(Long.MaxValue, Long.MinValue, 0L, 1990L).toDF("v").selectExpr("*", 
"CAST(v AS timestamp) as ts").selectExpr("*", "unix_micros(ts)").show(false)
++-++
|v                   |ts                           |unix_micros(ts)     |
++-++
|9223372036854775807 |+294247-01-10 04:00:54.775807|9223372036854775807 |
|-9223372036854775808|-290308-12-21 19:59:05.224192|-9223372036854775808|
|0                   |1970-01-01 00:00:00          |0                   |
|1990                |1970-01-01 00:33:10          |199000          |
++-++
scala> Seq(Long.MaxValue, Long.MinValue, 0L, 
1990L).toDF("v").repartition(1).selectExpr("*", "CAST(v AS timestamp) as 
ts").selectExpr("*", "unix_micros(ts)").show(false)
++---+---+
|v                   |ts                 |unix_micros(ts)|
++---+---+
|9223372036854775807 |1969-12-31 23:59:59|-100       |
|-9223372036854775808|1970-01-01 00:00:00|0              |
|0                   |1970-01-01 00:00:00|0              |
|1990                |1970-01-01 00:33:10|199000     |
++---+---+
{code}
It looks like InMemoryTableScanExec is not doing code generation for the 
expressions, but the ProjectExec after the repartition is.

If I disable code gen I get the same answer in both cases.
{code:java}
scala> spark.conf.set("spark.sql.codegen.wholeStage", false)
scala> spark.conf.set("spark.sql.codegen.factoryMode", "NO_CODEGEN")
scala> Seq(Long.MaxValue, Long.MinValue, 0L, 1990L).toDF("v").selectExpr("*", 
"CAST(v AS timestamp) as ts").selectExpr("*", "unix_micros(ts)").show(false)
++-++
|v                   |ts                           |unix_micros(ts)     |
++-++
|9223372036854775807 |+294247-01-10 04:00:54.775807|9223372036854775807 |
|-9223372036854775808|-290308-12-21 19:59:05.224192|-9223372036854775808|
|0                   |1970-01-01 00:00:00          |0                   |
|1990                |1970-01-01 00:33:10          |199000          |
++-++
scala> Seq(Long.MaxValue, Long.MinValue, 0L, 
1990L).toDF("v").repartition(1).selectExpr("*", "CAST(v AS timestamp) as 
ts").selectExpr("*", "unix_micros(ts)").show(false)
++-++
|v                   |ts                           |unix_micros(ts)     |
++-++
|9223372036854775807 |+294247-01-10 04:00:54.775807|9223372036854775807 |
|-9223372036854775808|-290308-12-21 19:59:05.224192|-9223372036854775808|
|0                   |1970-01-01 00:00:00          |0                   |
|1990                |1970-01-01 00:33:10          |199000          |
++-++
{code}
[https://github.com/apache/spark/blob/e2cd71a4cd54bbdf5af76d3edfbb2fc8c1b067b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L1627]

Is the code used in codegen, but

[https://github.com/apache/spark/blob/e2cd71a4cd54bbdf5af76d3edfbb2fc8c1b067b6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L687]

is what is used outside of code gen.

Apparently `SECONDS.toMicros` truncates the value on an overflow, but the 
codegen does not.
{code:java}
scala> Long.MaxValue
res11: Long = 9223372036854775807
scala> java.util.concurrent.TimeUnit.SECONDS.toMicros(Long.MaxValue)
res12: Long = 9223372036854775807
scala> Long.MaxValue * (1000L * 1000L)
res13: Long = -100
{code}
Ideally these would be consistent with each other. I personally would prefer 
the truncation as it feels more accurate, but I am fine either way.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[jira] [Created] (SPARK-46778) get_json_object flattens wildcard queries that match a single value

2024-01-19 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-46778:
---

 Summary: get_json_object flattens wildcard queries that match a 
single value
 Key: SPARK-46778
 URL: https://issues.apache.org/jira/browse/SPARK-46778
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.4.1
Reporter: Robert Joseph Evans


I think this impacts all versions of {{{}get_json_object{}}}, but I am not 100% 
sure.

The unit test for 
[$.store.book[*].reader|https://github.com/apache/spark/blob/39f8e1a5953b5897f893151d24dc585a80c0c8a0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala#L142-L146]
 verifies that the output of this query is a single level JSON array, but when 
I put the same JSON and JSON path into [http://jsonpath.com/] I get a result 
with multiple levels of nesting. It looks like Apache Spark tries to flatten 
lists for {{[*]}} matches when there is only a single element that matches.
{code:java}
scala> 
Seq("""[{"a":"A"},{"b":"B"}]""","""[{"a":"A"},{"a":"B"}]""").toDF("jsonStr").selectExpr("""get_json_object(jsonStr,"$[*].a")""").show(false)
++
|get_json_object(jsonStr, $[*].a)|
++
|"A"                             |
|["A","B"]                       |
++ {code}
But this has problems in that I no longer have a consistent schema returned, 
even if the input schema is known to be consistent. For example if I wanted to 
know how many elements matched this query I could wrap it in a 
{{json_array_length}} but that does not work in the generic case.
{code:java}
scala> 
Seq("""[{"a":"A"},{"b":"B"}]""","""[{"a":"A"},{"a":"B"}]""").toDF("jsonStr").selectExpr("""json_array_length(get_json_object(jsonStr,"$[*].a"))""").show(false)
+---+
|json_array_length(get_json_object(jsonStr, $[*].a))|
+---+
|null                                               |
|2                                                  |
+---+ {code}
If the value returned might be a JSON array, and then I would get a number, but 
it is wrong.
{code:java}
scala> 
Seq("""[{"a":[1,2,3,4,5]},{"b":"B"}]""","""[{"a":[1,2,3,4,5]},{"a":[1,2,3,4,5]}]""").toDF("jsonStr").selectExpr("""json_array_length(get_json_object(jsonStr,"$[*].a"))""").show(false)
+---+
|json_array_length(get_json_object(jsonStr, $[*].a))|
+---+
|5                                                  |
|2                                                  |
+---+ {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46761) quoted strings in a JSON path should support ? characters

2024-01-18 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-46761:
---

 Summary: quoted strings in a JSON path should support ? characters
 Key: SPARK-46761
 URL: https://issues.apache.org/jira/browse/SPARK-46761
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0, 4.0.0
Reporter: Robert Joseph Evans


I think this impacts all versions of Spark after SPARK-18677, which made the 
operator work at all in 2.1.0/2.0.3

I comes down to
{code:java}
 name <- '.' ~> "[^\\.\\[]+".r | "['" ~> "[^\\'\\?]+".r <~ "']"{code}
[https://github.com/apache/spark/blob/01bb1b1a3dbfc68f41d9b13de863d26d587c7e2f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala#L79]

 

The regular expression/pattern is saying that we want a [' followed by one or 
more characters that are not a single quote ' or a question mark ? followed by 
']. That question mark looks out of place. When I try to put in a question mark 
in a quoted string it fails to produce any result, but when I put the same 
data/path into [https://jsonpath.com/] I get a result

 

data
{code:java}
{"?":"QUESTION"} {code}
path
{code:java}
$['?'] {code}
 

I also see no tests validating that a question mark is not allowed so I suspect 
that it is a long standing bug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45879) Number check for InputFileBlockSources is missing for V2 source (BatchScan) ?

2023-11-13 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-45879:

Affects Version/s: 3.4.1
   3.2.3

> Number check for InputFileBlockSources is missing for V2 source (BatchScan) ?
> -
>
> Key: SPARK-45879
> URL: https://issues.apache.org/jira/browse/SPARK-45879
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.3, 3.4.1, 3.5.0
> Environment: I tried on Spark 323 and Spark 341, and both can 
> reproduce this issue.
>Reporter: Liangcai li
>Priority: Major
>
> When doing a join with the "input_file_name()" function, it will blow up with 
> a
> *AnalysisException* if using the v1 data source (FileSourceScan). That's ok.
>  
> But if we change to use the v2 data source (BatchScan), the expected 
> exception is gone, and the join passes.
> Is this number check for InputFileDataSources mssing for V2 data source ? or 
> is it by design ?
>  
> Repro steps:
> {code:java}
> scala> spark.range(100).withColumn("const1", 
> lit("from_t1")).write.parquet("/data/tmp/t1")
>  
> scala> spark.range(100).withColumn("const2", 
> lit("from_t2")).write.parquet("/data/tmp/t2")
>  
> scala> spark.conf.set("spark.sql.sources.useV1SourceList", "parquet")
>  
> scala> 
> spark.read.parquet("/data/tmp/t1").join(spark.read.parquet("/data/tmp/t2"), 
> "id", "inner").selectExpr("*", "input_file_name()").show(5, false)
> org.apache.spark.sql.AnalysisException: 'input_file_name' does not support 
> more than one sources.; line 1 pos 0;
> Project id#376L, const1#377, const2#381, input_file_name() AS 
> input_file_name()#389
> +- Project id#376L, const1#377, const2#381
>    +- Join Inner, (id#376L = id#380L)
>       :- Relation id#376L,const1#377 parquet
>       +- Relation id#380L,const2#381 parquet
>  
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:52)
>   at 
> org.apache.spark.sql.execution.datasources.PreReadCheck$.org$apache$spark$sql$execution$datasources$PreReadCheck$$checkNumInputFileBlockSources(rules.scala:476)
>   at 
> org.apache.spark.sql.execution.datasources.PreReadCheck$.$anonfun$checkNumInputFileBlockSources$2(rules.scala:472)
>   at 
> org.apache.spark.sql.execution.datasources.PreReadCheck$.$anonfun$checkNumInputFileBlockSources$2$adapted(rules.scala:472)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> scala> spark.conf.set("spark.sql.sources.useV1SourceList", "")
>  
> scala> 
> spark.read.parquet("/data/tmp/t1").join(spark.read.parquet("/data/tmp/t2"), 
> "id", "inner").selectExpr("*", "input_file_name()").show(5, false)
> +---+---+---+---+
> |id |const1 |const2 |input_file_name()                                        
>                               |
> +---+---+---+---+
> |91 
> |from_t1|from_t2|file:///data/tmp/t1/part-00011-a52b9990-4463-447c-9cdf-7a84542de2f7-c000.snappy.parquet|
> |92 
> |from_t1|from_t2|file:///data/tmp/t1/part-00011-a52b9990-4463-447c-9cdf-7a84542de2f7-c000.snappy.parquet|
> |93 
> |from_t1|from_t2|file:///data/tmp/t1/part-00011-a52b9990-4463-447c-9cdf-7a84542de2f7-c000.snappy.parquet|
> |94 
> |from_t1|from_t2|file:///data/tmp/t1/part-00011-a52b9990-4463-447c-9cdf-7a84542de2f7-c000.snappy.parquet|
> |95 
> |from_t1|from_t2|file:///data/tmp/t1/part-00011-a52b9990-4463-447c-9cdf-7a84542de2f7-c000.snappy.parquet|
> +---+---+---+---+
> only showing top 5 rows{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-45599) Percentile can produce a wrong answer if -0.0 and 0.0 are mixed in the dataset

2023-10-18 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-45599:

Priority: Blocker  (was: Major)

> Percentile can produce a wrong answer if -0.0 and 0.0 are mixed in the dataset
> --
>
> Key: SPARK-45599
> URL: https://issues.apache.org/jira/browse/SPARK-45599
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.2.3, 3.5.0
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: data-corruption
>
> I think this actually impacts all versions that have ever supported 
> percentile and it may impact other things because the bug is in OpenHashMap.
>  
> I am really surprised that we caught this bug because everything has to hit 
> just wrong to make it happen. in python/pyspark if you run
>  
> {code:python}
> from math import *
> from pyspark.sql.types import *
> data = [(1.779652973678931e+173,), (9.247723870123388e-295,), 
> (5.891823952773268e+98,), (inf,), (1.9042708096454302e+195,), 
> (-3.085825028509117e+74,), (-1.9569489404314425e+128,), 
> (2.0738138203216883e+201,), (inf,), (2.5212410617263588e-282,), 
> (-2.646144697462316e-35,), (-3.468683249247593e-196,), (nan,), (None,), 
> (nan,), (1.822129180806602e-245,), (5.211702553315461e-259,), (-1.0,), 
> (-5.682293414619055e+46,), (-4.585039307326895e+166,), 
> (-5.936844510098297e-82,), (-5234708055733.116,), (4920675036.053339,), 
> (None,), (4.4501477170144023e-308,), (2.176024662699802e-210,), 
> (-5.046677974902737e+132,), (-5.490780063080251e-09,), 
> (1.703824427218836e-55,), (-1.1961155424160076e+102,), 
> (1.4403274475565667e+41,), (None,), (5.4470705929955455e-86,), 
> (5.120795466142678e-215,), (-9.01991342808203e+282,), 
> (4.051866849943636e-254,), (-3588518231990.927,), (-1.8891559842111865e+63,), 
> (3.4543959813437507e-304,), (-7.590734560275502e-63,), 
> (9.376528689861087e+117,), (-2.1696969883753554e-292,), 
> (7.227411393136537e+206,), (-2.428999624265911e-293,), 
> (5.741383583382542e-14,), (-1.4882040107841963e+286,), 
> (2.1973064836362255e-159,), (0.028096279323357867,), 
> (8.475809563703283e-64,), (3.002803065141241e-139,), 
> (-1.1041009815645263e+203,), (1.8461539468514548e-225,), 
> (-5.620339412794757e-251,), (3.5103766991437114e-60,), 
> (2.4925669515657655e+165,), (3.217759099462207e+108,), 
> (-8.796717685143486e+203,), (2.037360925124577e+292,), 
> (-6.542279108216022e+206,), (-7.951172614280046e-74,), 
> (6.226527569272003e+152,), (-5.673977270111637e-84,), 
> (-1.0186016078084965e-281,), (1.7976931348623157e+308,), 
> (4.205809391029644e+137,), (-9.871721037428167e+119,), (None,), 
> (-1.6663254121185628e-256,), (1.0075153091760986e-236,), (-0.0,), (0.0,), 
> (1.7976931348623157e+308,), (4.3214483342777574e-117,), 
> (-7.973642629411105e-89,), (-1.1028137694801181e-297,), 
> (2.9000325280299273e-39,), (-1.077534929323113e-264,), 
> (-1.1847952892216515e+137,), (nan,), (7.849390806334983e+226,), 
> (-1.831402251805194e+65,), (-2.664533698035492e+203,), 
> (-2.2385155698231885e+285,), (-2.3016388448634844e-155,), 
> (-9.607772864590422e+217,), (3.437191836077251e+209,), 
> (1.9846569552093057e-137,), (-3.010452936419635e-233,), 
> (1.4309793775440402e-87,), (-2.9383643865423363e-103,), 
> (-4.696878567317712e-162,), (8.391630779050713e-135,), (nan,), 
> (-3.3885098786542755e-128,), (-4.5154178008513483e-122,), (nan,), (nan,), 
> (2.187766760184779e+306,), (7.679268835670585e+223,), 
> (6.3131466321042515e+153,), (1.779652973678931e+173,), 
> (9.247723870123388e-295,), (5.891823952773268e+98,), (inf,), 
> (1.9042708096454302e+195,), (-3.085825028509117e+74,), 
> (-1.9569489404314425e+128,), (2.0738138203216883e+201,), (inf,), 
> (2.5212410617263588e-282,), (-2.646144697462316e-35,), 
> (-3.468683249247593e-196,), (nan,), (None,), (nan,), 
> (1.822129180806602e-245,), (5.211702553315461e-259,), (-1.0,), 
> (-5.682293414619055e+46,), (-4.585039307326895e+166,), 
> (-5.936844510098297e-82,), (-5234708055733.116,), (4920675036.053339,), 
> (None,), (4.4501477170144023e-308,), (2.176024662699802e-210,), 
> (-5.046677974902737e+132,), (-5.490780063080251e-09,), 
> (1.703824427218836e-55,), (-1.1961155424160076e+102,), 
> (1.4403274475565667e+41,), (None,), (5.4470705929955455e-86,), 
> (5.120795466142678e-215,), (-9.01991342808203e+282,), 
> (4.051866849943636e-254,), (-3588518231990.927,), (-1.8891559842111865e+63,), 
> (3.4543959813437507e-304,), (-7.590734560275502e-63,), 
> (9.376528689861087e+117,), (-2.1696969883753554e-292,), 
> (7.227411393136537e+206,), (-2.428999624265911e-293,), 
> (5.741383583382542e-14,), (-1.4882040107841963e+286,), 
> (2.1973064836362255e-159,), (0.028096279323357867,), 
> (8.475809563703283e-64,), (3.002803065141241e-139,), 
> 

[jira] [Updated] (SPARK-45599) Percentile can produce a wrong answer if -0.0 and 0.0 are mixed in the dataset

2023-10-18 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-45599:

Labels: data-corruption  (was: )

> Percentile can produce a wrong answer if -0.0 and 0.0 are mixed in the dataset
> --
>
> Key: SPARK-45599
> URL: https://issues.apache.org/jira/browse/SPARK-45599
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0, 3.2.3, 3.5.0
>Reporter: Robert Joseph Evans
>Priority: Major
>  Labels: data-corruption
>
> I think this actually impacts all versions that have ever supported 
> percentile and it may impact other things because the bug is in OpenHashMap.
>  
> I am really surprised that we caught this bug because everything has to hit 
> just wrong to make it happen. in python/pyspark if you run
>  
> {code:python}
> from math import *
> from pyspark.sql.types import *
> data = [(1.779652973678931e+173,), (9.247723870123388e-295,), 
> (5.891823952773268e+98,), (inf,), (1.9042708096454302e+195,), 
> (-3.085825028509117e+74,), (-1.9569489404314425e+128,), 
> (2.0738138203216883e+201,), (inf,), (2.5212410617263588e-282,), 
> (-2.646144697462316e-35,), (-3.468683249247593e-196,), (nan,), (None,), 
> (nan,), (1.822129180806602e-245,), (5.211702553315461e-259,), (-1.0,), 
> (-5.682293414619055e+46,), (-4.585039307326895e+166,), 
> (-5.936844510098297e-82,), (-5234708055733.116,), (4920675036.053339,), 
> (None,), (4.4501477170144023e-308,), (2.176024662699802e-210,), 
> (-5.046677974902737e+132,), (-5.490780063080251e-09,), 
> (1.703824427218836e-55,), (-1.1961155424160076e+102,), 
> (1.4403274475565667e+41,), (None,), (5.4470705929955455e-86,), 
> (5.120795466142678e-215,), (-9.01991342808203e+282,), 
> (4.051866849943636e-254,), (-3588518231990.927,), (-1.8891559842111865e+63,), 
> (3.4543959813437507e-304,), (-7.590734560275502e-63,), 
> (9.376528689861087e+117,), (-2.1696969883753554e-292,), 
> (7.227411393136537e+206,), (-2.428999624265911e-293,), 
> (5.741383583382542e-14,), (-1.4882040107841963e+286,), 
> (2.1973064836362255e-159,), (0.028096279323357867,), 
> (8.475809563703283e-64,), (3.002803065141241e-139,), 
> (-1.1041009815645263e+203,), (1.8461539468514548e-225,), 
> (-5.620339412794757e-251,), (3.5103766991437114e-60,), 
> (2.4925669515657655e+165,), (3.217759099462207e+108,), 
> (-8.796717685143486e+203,), (2.037360925124577e+292,), 
> (-6.542279108216022e+206,), (-7.951172614280046e-74,), 
> (6.226527569272003e+152,), (-5.673977270111637e-84,), 
> (-1.0186016078084965e-281,), (1.7976931348623157e+308,), 
> (4.205809391029644e+137,), (-9.871721037428167e+119,), (None,), 
> (-1.6663254121185628e-256,), (1.0075153091760986e-236,), (-0.0,), (0.0,), 
> (1.7976931348623157e+308,), (4.3214483342777574e-117,), 
> (-7.973642629411105e-89,), (-1.1028137694801181e-297,), 
> (2.9000325280299273e-39,), (-1.077534929323113e-264,), 
> (-1.1847952892216515e+137,), (nan,), (7.849390806334983e+226,), 
> (-1.831402251805194e+65,), (-2.664533698035492e+203,), 
> (-2.2385155698231885e+285,), (-2.3016388448634844e-155,), 
> (-9.607772864590422e+217,), (3.437191836077251e+209,), 
> (1.9846569552093057e-137,), (-3.010452936419635e-233,), 
> (1.4309793775440402e-87,), (-2.9383643865423363e-103,), 
> (-4.696878567317712e-162,), (8.391630779050713e-135,), (nan,), 
> (-3.3885098786542755e-128,), (-4.5154178008513483e-122,), (nan,), (nan,), 
> (2.187766760184779e+306,), (7.679268835670585e+223,), 
> (6.3131466321042515e+153,), (1.779652973678931e+173,), 
> (9.247723870123388e-295,), (5.891823952773268e+98,), (inf,), 
> (1.9042708096454302e+195,), (-3.085825028509117e+74,), 
> (-1.9569489404314425e+128,), (2.0738138203216883e+201,), (inf,), 
> (2.5212410617263588e-282,), (-2.646144697462316e-35,), 
> (-3.468683249247593e-196,), (nan,), (None,), (nan,), 
> (1.822129180806602e-245,), (5.211702553315461e-259,), (-1.0,), 
> (-5.682293414619055e+46,), (-4.585039307326895e+166,), 
> (-5.936844510098297e-82,), (-5234708055733.116,), (4920675036.053339,), 
> (None,), (4.4501477170144023e-308,), (2.176024662699802e-210,), 
> (-5.046677974902737e+132,), (-5.490780063080251e-09,), 
> (1.703824427218836e-55,), (-1.1961155424160076e+102,), 
> (1.4403274475565667e+41,), (None,), (5.4470705929955455e-86,), 
> (5.120795466142678e-215,), (-9.01991342808203e+282,), 
> (4.051866849943636e-254,), (-3588518231990.927,), (-1.8891559842111865e+63,), 
> (3.4543959813437507e-304,), (-7.590734560275502e-63,), 
> (9.376528689861087e+117,), (-2.1696969883753554e-292,), 
> (7.227411393136537e+206,), (-2.428999624265911e-293,), 
> (5.741383583382542e-14,), (-1.4882040107841963e+286,), 
> (2.1973064836362255e-159,), (0.028096279323357867,), 
> (8.475809563703283e-64,), (3.002803065141241e-139,), 
> 

[jira] [Created] (SPARK-45599) Percentile can produce a wrong answer if -0.0 and 0.0 are mixed in the dataset

2023-10-18 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-45599:
---

 Summary: Percentile can produce a wrong answer if -0.0 and 0.0 are 
mixed in the dataset
 Key: SPARK-45599
 URL: https://issues.apache.org/jira/browse/SPARK-45599
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.5.0, 3.2.3, 3.3.0
Reporter: Robert Joseph Evans


I think this actually impacts all versions that have ever supported percentile 
and it may impact other things because the bug is in OpenHashMap.

 

I am really surprised that we caught this bug because everything has to hit 
just wrong to make it happen. in python/pyspark if you run

 
{code:python}
from math import *
from pyspark.sql.types import *

data = [(1.779652973678931e+173,), (9.247723870123388e-295,), 
(5.891823952773268e+98,), (inf,), (1.9042708096454302e+195,), 
(-3.085825028509117e+74,), (-1.9569489404314425e+128,), 
(2.0738138203216883e+201,), (inf,), (2.5212410617263588e-282,), 
(-2.646144697462316e-35,), (-3.468683249247593e-196,), (nan,), (None,), (nan,), 
(1.822129180806602e-245,), (5.211702553315461e-259,), (-1.0,), 
(-5.682293414619055e+46,), (-4.585039307326895e+166,), 
(-5.936844510098297e-82,), (-5234708055733.116,), (4920675036.053339,), 
(None,), (4.4501477170144023e-308,), (2.176024662699802e-210,), 
(-5.046677974902737e+132,), (-5.490780063080251e-09,), 
(1.703824427218836e-55,), (-1.1961155424160076e+102,), 
(1.4403274475565667e+41,), (None,), (5.4470705929955455e-86,), 
(5.120795466142678e-215,), (-9.01991342808203e+282,), 
(4.051866849943636e-254,), (-3588518231990.927,), (-1.8891559842111865e+63,), 
(3.4543959813437507e-304,), (-7.590734560275502e-63,), 
(9.376528689861087e+117,), (-2.1696969883753554e-292,), 
(7.227411393136537e+206,), (-2.428999624265911e-293,), 
(5.741383583382542e-14,), (-1.4882040107841963e+286,), 
(2.1973064836362255e-159,), (0.028096279323357867,), (8.475809563703283e-64,), 
(3.002803065141241e-139,), (-1.1041009815645263e+203,), 
(1.8461539468514548e-225,), (-5.620339412794757e-251,), 
(3.5103766991437114e-60,), (2.4925669515657655e+165,), 
(3.217759099462207e+108,), (-8.796717685143486e+203,), 
(2.037360925124577e+292,), (-6.542279108216022e+206,), 
(-7.951172614280046e-74,), (6.226527569272003e+152,), 
(-5.673977270111637e-84,), (-1.0186016078084965e-281,), 
(1.7976931348623157e+308,), (4.205809391029644e+137,), 
(-9.871721037428167e+119,), (None,), (-1.6663254121185628e-256,), 
(1.0075153091760986e-236,), (-0.0,), (0.0,), (1.7976931348623157e+308,), 
(4.3214483342777574e-117,), (-7.973642629411105e-89,), 
(-1.1028137694801181e-297,), (2.9000325280299273e-39,), 
(-1.077534929323113e-264,), (-1.1847952892216515e+137,), (nan,), 
(7.849390806334983e+226,), (-1.831402251805194e+65,), 
(-2.664533698035492e+203,), (-2.2385155698231885e+285,), 
(-2.3016388448634844e-155,), (-9.607772864590422e+217,), 
(3.437191836077251e+209,), (1.9846569552093057e-137,), 
(-3.010452936419635e-233,), (1.4309793775440402e-87,), 
(-2.9383643865423363e-103,), (-4.696878567317712e-162,), 
(8.391630779050713e-135,), (nan,), (-3.3885098786542755e-128,), 
(-4.5154178008513483e-122,), (nan,), (nan,), (2.187766760184779e+306,), 
(7.679268835670585e+223,), (6.3131466321042515e+153,), 
(1.779652973678931e+173,), (9.247723870123388e-295,), (5.891823952773268e+98,), 
(inf,), (1.9042708096454302e+195,), (-3.085825028509117e+74,), 
(-1.9569489404314425e+128,), (2.0738138203216883e+201,), (inf,), 
(2.5212410617263588e-282,), (-2.646144697462316e-35,), 
(-3.468683249247593e-196,), (nan,), (None,), (nan,), (1.822129180806602e-245,), 
(5.211702553315461e-259,), (-1.0,), (-5.682293414619055e+46,), 
(-4.585039307326895e+166,), (-5.936844510098297e-82,), (-5234708055733.116,), 
(4920675036.053339,), (None,), (4.4501477170144023e-308,), 
(2.176024662699802e-210,), (-5.046677974902737e+132,), 
(-5.490780063080251e-09,), (1.703824427218836e-55,), 
(-1.1961155424160076e+102,), (1.4403274475565667e+41,), (None,), 
(5.4470705929955455e-86,), (5.120795466142678e-215,), 
(-9.01991342808203e+282,), (4.051866849943636e-254,), (-3588518231990.927,), 
(-1.8891559842111865e+63,), (3.4543959813437507e-304,), 
(-7.590734560275502e-63,), (9.376528689861087e+117,), 
(-2.1696969883753554e-292,), (7.227411393136537e+206,), 
(-2.428999624265911e-293,), (5.741383583382542e-14,), 
(-1.4882040107841963e+286,), (2.1973064836362255e-159,), 
(0.028096279323357867,), (8.475809563703283e-64,), (3.002803065141241e-139,), 
(-1.1041009815645263e+203,), (1.8461539468514548e-225,), 
(-5.620339412794757e-251,), (3.5103766991437114e-60,), 
(2.4925669515657655e+165,), (3.217759099462207e+108,), 
(-8.796717685143486e+203,), (2.037360925124577e+292,), 
(-6.542279108216022e+206,), (-7.951172614280046e-74,), 
(6.226527569272003e+152,), (-5.673977270111637e-84,), 
(-1.0186016078084965e-281,), (1.7976931348623157e+308,), 
(4.205809391029644e+137,), (-9.871721037428167e+119,), 

[jira] [Created] (SPARK-45243) RADIX sort is not stable and can produce different results for first/collect_list aggs

2023-09-20 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-45243:
---

 Summary: RADIX sort is not stable and can produce different 
results for first/collect_list aggs
 Key: SPARK-45243
 URL: https://issues.apache.org/jira/browse/SPARK-45243
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Robert Joseph Evans


I just set the version as 3.3.0. I think it is all versions, but I have not 
tested it to be sure.

I simple query like
{code:java}
spark.read.parquet("/tmp/TEST").groupBy("a").agg(first(col("b"))).show()
{code}
can produce different results depending on if RADIX sort is enabled or not. In 
this case "a" is a LONG and "b" is a STRING. The STRING forces the aggregation 
to be a sort based aggregation and b being a long causes the sort to default to 
a RADIX sort.

In practice first, last, and collect_list are not deterministic because the 
order that a shuffle arrives to a task is a race so the order that the rows 
arrive after the second stage of the aggregation is totally up in the air. 
first and last might just be called pick_one. But in this case the data is 
small enough that there is a single partition so it should be deterministic. 
But it is not.
{code:java}
scala> spark.read.parquet("/tmp/TEST").show(100, false)
+++
|a   |b   |
+++
|4080731634774120135 |HH  |
|-7996385019137306797|AA  |
|4891386765580059730 |BI  |
|-2578026341565473682|DE  |
|-7264635988756013877|CH  |
|5656737394922367923 |AG  |
|-6183011807271780569|BG  |
|109827782918242415  |CD  |
|-4058328039203991995|FA  |
|null|FG  |
|4080731634774120135 |ID  |
|-7996385019137306797|GG  |
|4891386765580059730 |AC  |
|-2578026341565473682|null|
|-7264635988756013877|HF  |
|5656737394922367923 |II  |
|-6183011807271780569|FC  |
|109827782918242415  |DI  |
|-4058328039203991995|IH  |
|null|FE  |
|4080731634774120135 |HA  |
|-7996385019137306797|ID  |
|4891386765580059730 |GI  |
|-2578026341565473682|GB  |
|-7264635988756013877|EC  |
|5656737394922367923 |DA  |
|-6183011807271780569|BB  |
|109827782918242415  |AE  |
|-4058328039203991995|FE  |
|null|AE  |
|4080731634774120135 |BC  |
|-7996385019137306797|HF  |
+++

scala> spark.read.parquet("/tmp/TEST").groupBy("a").agg(first(col("b"))).show()
+++
|   a|first(b)|
+++
|null|  FG|
|-7996385019137306797|  GG|
|-7264635988756013877|  CH|
|-6183011807271780569|  BG|
|-4058328039203991995|  FA|
|-2578026341565473682|  DE|
|  109827782918242415|  CD|
| 4080731634774120135|  HH|
| 4891386765580059730|  AC|
| 5656737394922367923|  AG|
+++

scala> spark.conf.set("spark.sql.sort.enableRadixSort", false)

scala> spark.read.parquet("/tmp/TEST").groupBy("a").agg(first(col("b"))).show()
+++
|   a|first(b)|
+++
|null|  FG|
|-7996385019137306797|  AA|
|-7264635988756013877|  CH|
|-6183011807271780569|  BG|
|-4058328039203991995|  FA|
|-2578026341565473682|  DE|
|  109827782918242415|  CD|
| 4080731634774120135|  HH|
| 4891386765580059730|  BI|
| 5656737394922367923|  AG|
+++
{code}
Here the values for -7996385019137306797 changed from GG with radix sort on to 
AA with it off.  AA is technially correct, because it appears on line 2 of the 
input where as GG shows up on line 12. 

 

I honestly don't know if Spark expects the sort to be stable or not. Looking at 
the code SortExec and UnsafeExternalSorter do not make any claims about being 
stable and https://issues.apache.org/jira/browse/SPARK-23973 indicates that 
sort is not stable, so this might just works as designed.  I just find it odd 
that in most cases it is stable, so I guess that was just by accident.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-44500) parse_url treats key as regular expression

2023-07-20 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-44500:
---

 Summary: parse_url treats key as regular expression
 Key: SPARK-44500
 URL: https://issues.apache.org/jira/browse/SPARK-44500
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.4.1, 3.4.0, 3.3.0, 3.2.0
Reporter: Robert Joseph Evans


To be clear I am not 100% sure that this is a bug. It might be a feature, but I 
don't see anywhere that it is used as a feature. If it is a feature it really 
should be documented, because there are pitfalls. If it is a bug it should be 
fixed because it is really confusing and it is simple to shoot yourself in the 
foot.

```scala
> val urls = Seq("http://foo/bar?abc=BAD=GOOD;, 
> "http://foo/bar?a.c=GOOD=BAD;).toDF
> urls.selectExpr("parse_url(value, 'QUERY', 'a.c')").show(false)

++
|parse_url(value, QUERY, a.c)|
++
|BAD |
|GOOD|
++

> urls.selectExpr("parse_url(value, 'QUERY', 'a[c')").show(false)
java.util.regex.PatternSyntaxException: Unclosed character class near index 15
(&|^)a[c=([^&]*)
   ^
  at java.util.regex.Pattern.error(Pattern.java:1969)
  at java.util.regex.Pattern.clazz(Pattern.java:2562)
  at java.util.regex.Pattern.sequence(Pattern.java:2077)
  at java.util.regex.Pattern.expr(Pattern.java:2010)
  at java.util.regex.Pattern.compile(Pattern.java:1702)
  at java.util.regex.Pattern.(Pattern.java:1352)
  at java.util.regex.Pattern.compile(Pattern.java:1028)

```

The simple fix is to quote the key when making the pattern.

```scala
  private def getPattern(key: UTF8String): Pattern = {
Pattern.compile(REGEXPREFIX + Pattern.quote(key.toString) + REGEXSUBFIX)
  }
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-42898) Cast from string to date and date to string say timezone is needed, but it is not used

2023-03-22 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-42898:
---

 Summary: Cast from string to date and date to string say timezone 
is needed, but it is not used
 Key: SPARK-42898
 URL: https://issues.apache.org/jira/browse/SPARK-42898
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Robert Joseph Evans


This is really minor but SPARK-35581 removed the need for a timezone when 
casting from a `StringType` to a `DateType`, but the patch didn't update the 
`needsTimeZone` function to indicate that it was not longer required.

Currently Casting from a DateType to a StringType also says that it needs the 
timezone, but it only uses the `DateFormatter` with it's default parameters 
that do not use the time zone at all.

I think this can be fixed with just a two line change.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-41218) ParquetTable reports is supports negative scale decimal values

2022-11-21 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-41218:
---

 Summary: ParquetTable reports is supports negative scale decimal 
values
 Key: SPARK-41218
 URL: https://issues.apache.org/jira/browse/SPARK-41218
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0, 3.4.0
Reporter: Robert Joseph Evans


This is likely very minor, but {{ParquetTable}} says it supports all 
{{AtomicTypes}}

https://github.com/apache/spark/blob/07427b854be58810bd485c00c5e5c576d5aa404e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala#L52

But the ParquetSpec and code says that negative scale decimal values are not 
supported, and the code will throw an exception if you try to store one.

{code}
 java.lang.IllegalArgumentException: Invalid DECIMAL scale: -2
at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:57)
at 
org.apache.parquet.schema.Types$BasePrimitiveBuilder.decimalMetadata(Types.java:616)
at 
org.apache.parquet.schema.Types$BasePrimitiveBuilder.build(Types.java:443)
at 
org.apache.parquet.schema.Types$BasePrimitiveBuilder.build(Types.java:338)

{code}

We should update the ParquetTable code to be accurate in this respect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40280) Failure to create parquet predicate push down for ints and longs on some valid files

2022-08-30 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-40280:
---

 Summary: Failure to create parquet predicate push down for ints 
and longs on some valid files
 Key: SPARK-40280
 URL: https://issues.apache.org/jira/browse/SPARK-40280
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0, 3.2.0, 3.1.0, 3.4.0
Reporter: Robert Joseph Evans


The [parquet 
format|https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#signed-integers]
 specification states that...

bq. {{{}INT(8, true){}}}, {{{}INT(16, true){}}}, and {{INT(32, true)}} must 
annotate an {{int32}} primitive type and {{INT(64, true)}} must annotate an 
{{int64}} primitive type. {{INT(32, true)}} and {{INT(64, true)}} are implied 
by the {{int32}} and {{int64}} primitive types if no other annotation is 
present and should be considered optional.

But the code inside of 
[ParquetFilters.scala|https://github.com/apache/spark/blob/296fe49ec855ac8c15c080e7bab6d519fe504bd3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L125-L126]
 requires that for {{int32}} and {{int64}} that there be no annotation. If 
there is an annotation for those columns and they are a part of a predicate 
push down, the hard coded types will not match and the corresponding filter 
ends up being {{None}}.

This can be a huge performance penalty for a valid parquet file.

I am happy to provide files that show the issue if needed for testing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40129) Decimal multiply can produce the wrong answer because it rounds twice

2022-08-17 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-40129:
---

 Summary: Decimal multiply can produce the wrong answer because it 
rounds twice
 Key: SPARK-40129
 URL: https://issues.apache.org/jira/browse/SPARK-40129
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0, 3.2.0, 3.4.0
Reporter: Robert Joseph Evans


This looks like it has been around for a long time, but I have reproduced it in 
3.2.0+

The example here is multiplying Decimal(38, 10) by another Decimal(38, 10), but 
I think it can be reproduced with other number combinations, and possibly with 
divide too.
{code:java}
Seq("9173594185998001607642838421.5479932913").toDF.selectExpr("CAST(value as 
DECIMAL(38,10)) as a").selectExpr("a * CAST(-12 as 
DECIMAL(38,10))").show(truncate=false)
{code}
This produces an answer in Spark of {{-110083130231976019291714061058.575920}} 
But if I do the calculation in regular java BigDecimal I get 
{{-110083130231976019291714061058.575919}}
{code:java}
BigDecimal l = new BigDecimal("9173594185998001607642838421.5479932913");
BigDecimal r = new BigDecimal("-12.00");
BigDecimal prod = l.multiply(r);
BigDecimal rounded_prod = prod.setScale(6, RoundingMode.HALF_UP);
{code}
Spark does essentially all of the same operations, but it used Decimal to do it 
instead of java's BigDecimal directly. Spark, by way of Decimal, will set a 
MathContext for the multiply operation that has a max precision of 38 and will 
do half up rounding. That means that the result of the multiply operation in 
Spark is {{{}-110083130231976019291714061058.57591950{}}}, but for the java 
BigDecimal code the result is 
{{{}-110083130231976019291714061058.575919495600{}}}. Then in 
CheckOverflow for 3.2.0 and 3.3.0 or in just the regular Multiply expression in 
3.4.0 the setScale is called (as a part of Decimal.setPrecision). At that point 
the already rounded number is rounded yet again resulting in what is arguably a 
wrong answer by Spark.

I have not fully tested this, but it looks like we could just remove the 
MathContext entirely in Decimal, or set it to UNLIMITED. All of the decimal 
operations appear to have their own overflow and rounding anyways. If we want 
to potentially reduce the total memory usage, we could also set the max 
precision to 39 and truncate (round down) the result in the math context 
instead.  That would then let us round the result correctly in setPrecision 
afterwards.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40089) Sorting of at least Decimal(20, 2) fails for some values near the max.

2022-08-16 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580434#comment-17580434
 ] 

Robert Joseph Evans commented on SPARK-40089:
-

I put up a PR https://github.com/apache/spark/pull/37540

> Sorting of at least Decimal(20, 2) fails for some values near the max.
> --
>
> Key: SPARK-40089
> URL: https://issues.apache.org/jira/browse/SPARK-40089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Robert Joseph Evans
>Priority: Major
> Attachments: input.parquet
>
>
> I have been doing some testing with Decimal values for the RAPIDS Accelerator 
> for Apache Spark. I have been trying to add in new corner cases and when I 
> tried to enable the maximum supported value for a sort I started to get 
> failures.  On closer inspection it looks like the CPU is sorting things 
> incorrectly.  Specifically anything that is "99.50" or above 
> is placed as a chunk in the wrong location in the outputs.
>  In local mode with 12 tasks.
> {code:java}
> spark.read.parquet("input.parquet").orderBy(col("a")).collect.foreach(System.err.println)
>  {code}
>  
> Here you will notice that the last entry printed is 
> {{[99.49]}}, and {{[99.99]}} is near the top 
> near {{[-99.99]}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40089) Sorting of at least Decimal(20, 2) fails for some values near the max.

2022-08-16 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-40089:

Summary: Sorting of at least Decimal(20, 2) fails for some values near the 
max.  (was: Doring of at least Decimal(20, 2) fails for some values near the 
max.)

> Sorting of at least Decimal(20, 2) fails for some values near the max.
> --
>
> Key: SPARK-40089
> URL: https://issues.apache.org/jira/browse/SPARK-40089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Robert Joseph Evans
>Priority: Major
> Attachments: input.parquet
>
>
> I have been doing some testing with Decimal values for the RAPIDS Accelerator 
> for Apache Spark. I have been trying to add in new corner cases and when I 
> tried to enable the maximum supported value for a sort I started to get 
> failures.  On closer inspection it looks like the CPU is sorting things 
> incorrectly.  Specifically anything that is "99.50" or above 
> is placed as a chunk in the wrong location in the outputs.
>  In local mode with 12 tasks.
> {code:java}
> spark.read.parquet("input.parquet").orderBy(col("a")).collect.foreach(System.err.println)
>  {code}
>  
> Here you will notice that the last entry printed is 
> {{[99.49]}}, and {{[99.99]}} is near the top 
> near {{[-99.99]}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40089) Doring of at least Decimal(20, 2) fails for some values near the max.

2022-08-16 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580377#comment-17580377
 ] 

Robert Joseph Evans commented on SPARK-40089:
-

Never mind I figured out that there is a separate prefixComparator that does 
the same kinds of checks. But I have a fix that works, so I will put up a PR 
shortly.

> Doring of at least Decimal(20, 2) fails for some values near the max.
> -
>
> Key: SPARK-40089
> URL: https://issues.apache.org/jira/browse/SPARK-40089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Robert Joseph Evans
>Priority: Major
> Attachments: input.parquet
>
>
> I have been doing some testing with Decimal values for the RAPIDS Accelerator 
> for Apache Spark. I have been trying to add in new corner cases and when I 
> tried to enable the maximum supported value for a sort I started to get 
> failures.  On closer inspection it looks like the CPU is sorting things 
> incorrectly.  Specifically anything that is "99.50" or above 
> is placed as a chunk in the wrong location in the outputs.
>  In local mode with 12 tasks.
> {code:java}
> spark.read.parquet("input.parquet").orderBy(col("a")).collect.foreach(System.err.println)
>  {code}
>  
> Here you will notice that the last entry printed is 
> {{[99.49]}}, and {{[99.99]}} is near the top 
> near {{[-99.99]}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40089) Doring of at least Decimal(20, 2) fails for some values near the max.

2022-08-16 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17580360#comment-17580360
 ] 

Robert Joseph Evans commented on SPARK-40089:
-

I have been trying to come up with a patch, but keep hitting some issues. I 
first tried to change 

{code}
 case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS =>
{code}

to

{code}
 case dt: DecimalType if dt.precision - dt.scale < Decimal.MAX_LONG_DIGITS =>
{code}

So that we would bypass the overflow case entirely and use the Double prefix 
logic. But when I do that the negative values all come after the positive 
values when sorting ascending. So now I have a lot of other tests/debugging 
that I need to run to understand what is happening there. Just because I think 
I have found another bug. 

[~ulysses] I don't have a ton of time that I can devote to this right now, I 
will keep working towards a patch, but if you want to put up one, then I would 
love to see it. 

> Doring of at least Decimal(20, 2) fails for some values near the max.
> -
>
> Key: SPARK-40089
> URL: https://issues.apache.org/jira/browse/SPARK-40089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Robert Joseph Evans
>Priority: Major
> Attachments: input.parquet
>
>
> I have been doing some testing with Decimal values for the RAPIDS Accelerator 
> for Apache Spark. I have been trying to add in new corner cases and when I 
> tried to enable the maximum supported value for a sort I started to get 
> failures.  On closer inspection it looks like the CPU is sorting things 
> incorrectly.  Specifically anything that is "99.50" or above 
> is placed as a chunk in the wrong location in the outputs.
>  In local mode with 12 tasks.
> {code:java}
> spark.read.parquet("input.parquet").orderBy(col("a")).collect.foreach(System.err.println)
>  {code}
>  
> Here you will notice that the last entry printed is 
> {{[99.49]}}, and {{[99.99]}} is near the top 
> near {{[-99.99]}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40089) Doring of at least Decimal(20, 2) fails for some values near the max.

2022-08-15 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17579897#comment-17579897
 ] 

Robert Joseph Evans commented on SPARK-40089:
-

Looking at the code it appears that the prefix calculator has an overflow bug 
in it.

{code}
if (value.changePrecision(p, s)) value.toUnscaledLong else Long.MinValue
{code}

We are rounding up when changing the precision and when that happens we fall 
back to {{Long.MinValue}}  a.k.a -9223372036854775808, which results in the 
failure. 

> Doring of at least Decimal(20, 2) fails for some values near the max.
> -
>
> Key: SPARK-40089
> URL: https://issues.apache.org/jira/browse/SPARK-40089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Robert Joseph Evans
>Priority: Major
> Attachments: input.parquet
>
>
> I have been doing some testing with Decimal values for the RAPIDS Accelerator 
> for Apache Spark. I have been trying to add in new corner cases and when I 
> tried to enable the maximum supported value for a sort I started to get 
> failures.  On closer inspection it looks like the CPU is sorting things 
> incorrectly.  Specifically anything that is "99.50" or above 
> is placed as a chunk in the wrong location in the outputs.
>  In local mode with 12 tasks.
> {code:java}
> spark.read.parquet("input.parquet").orderBy(col("a")).collect.foreach(System.err.println)
>  {code}
>  
> Here you will notice that the last entry printed is 
> {{[99.49]}}, and {{[99.99]}} is near the top 
> near {{[-99.99]}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40089) Doring of at least Decimal(20, 2) fails for some values near the max.

2022-08-15 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17579892#comment-17579892
 ] 

Robert Joseph Evans commented on SPARK-40089:
-

It sure looks like it is related to the prefix calculator. I think it is 
overflowing some how. I added some debugging into 3.2.0 and I got back

{code}
22/08/15 20:17:58 ERROR SortExec: PREFIX FOR 99.99 IS false 
-9223372036854775808
{code}

The prefix should not be negative for non-negative values.


> Doring of at least Decimal(20, 2) fails for some values near the max.
> -
>
> Key: SPARK-40089
> URL: https://issues.apache.org/jira/browse/SPARK-40089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Robert Joseph Evans
>Priority: Major
> Attachments: input.parquet
>
>
> I have been doing some testing with Decimal values for the RAPIDS Accelerator 
> for Apache Spark. I have been trying to add in new corner cases and when I 
> tried to enable the maximum supported value for a sort I started to get 
> failures.  On closer inspection it looks like the CPU is sorting things 
> incorrectly.  Specifically anything that is "99.50" or above 
> is placed as a chunk in the wrong location in the outputs.
>  In local mode with 12 tasks.
> {code:java}
> spark.read.parquet("input.parquet").orderBy(col("a")).collect.foreach(System.err.println)
>  {code}
>  
> Here you will notice that the last entry printed is 
> {{[99.49]}}, and {{[99.99]}} is near the top 
> near {{[-99.99]}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40089) Doring of at least Decimal(20, 2) fails for some values near the max.

2022-08-15 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17579887#comment-17579887
 ] 

Robert Joseph Evans commented on SPARK-40089:
-

I have been trying to debug this and it does not look like it is related to the 
partitioner. I can run with a single shuffle partition and I get the same 
results. Not sure if the prefix calculation is doing this or what.

> Doring of at least Decimal(20, 2) fails for some values near the max.
> -
>
> Key: SPARK-40089
> URL: https://issues.apache.org/jira/browse/SPARK-40089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Robert Joseph Evans
>Priority: Major
> Attachments: input.parquet
>
>
> I have been doing some testing with Decimal values for the RAPIDS Accelerator 
> for Apache Spark. I have been trying to add in new corner cases and when I 
> tried to enable the maximum supported value for a sort I started to get 
> failures.  On closer inspection it looks like the CPU is sorting things 
> incorrectly.  Specifically anything that is "99.50" or above 
> is placed as a chunk in the wrong location in the outputs.
>  In local mode with 12 tasks.
> {code:java}
> spark.read.parquet("input.parquet").orderBy(col("a")).collect.foreach(System.err.println)
>  {code}
>  
> Here you will notice that the last entry printed is 
> {{[99.49]}}, and {{[99.99]}} is near the top 
> near {{[-99.99]}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40089) Doring of at least Decimal(20, 2) fails for some values near the max.

2022-08-15 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-40089:

Attachment: input.parquet

> Doring of at least Decimal(20, 2) fails for some values near the max.
> -
>
> Key: SPARK-40089
> URL: https://issues.apache.org/jira/browse/SPARK-40089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0, 3.3.0, 3.4.0
>Reporter: Robert Joseph Evans
>Priority: Major
> Attachments: input.parquet
>
>
> I have been doing some testing with Decimal values for the RAPIDS Accelerator 
> for Apache Spark. I have been trying to add in new corner cases and when I 
> tried to enable the maximum supported value for a sort I started to get 
> failures.  On closer inspection it looks like the CPU is sorting things 
> incorrectly.  Specifically anything that is "99.50" or above 
> is placed as a chunk in the wrong location in the outputs.
>  In local mode with 12 tasks.
> {code:java}
> spark.read.parquet("input.parquet").orderBy(col("a")).collect.foreach(System.err.println)
>  {code}
>  
> Here you will notice that the last entry printed is 
> {{[99.49]}}, and {{[99.99]}} is near the top 
> near {{[-99.99]}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40089) Doring of at least Decimal(20, 2) fails for some values near the max.

2022-08-15 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-40089:
---

 Summary: Doring of at least Decimal(20, 2) fails for some values 
near the max.
 Key: SPARK-40089
 URL: https://issues.apache.org/jira/browse/SPARK-40089
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0, 3.2.0, 3.4.0
Reporter: Robert Joseph Evans


I have been doing some testing with Decimal values for the RAPIDS Accelerator 
for Apache Spark. I have been trying to add in new corner cases and when I 
tried to enable the maximum supported value for a sort I started to get 
failures.  On closer inspection it looks like the CPU is sorting things 
incorrectly.  Specifically anything that is "99.50" or above is 
placed as a chunk in the wrong location in the outputs.

 In local mode with 12 tasks.
{code:java}
spark.read.parquet("input.parquet").orderBy(col("a")).collect.foreach(System.err.println)
 {code}
 

Here you will notice that the last entry printed is 
{{[99.49]}}, and {{[99.99]}} is near the top 
near {{[-99.99]}}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-39031) NaN != NaN in pivot

2022-04-26 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-39031:
---

 Summary: NaN != NaN in pivot
 Key: SPARK-39031
 URL: https://issues.apache.org/jira/browse/SPARK-39031
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Robert Joseph Evans


I know this is an odd corner case, but NaN != NaN in pivot which is 
inconsistent with other places in Spark.

 

{code}

scala> Seq(Double.NaN, Double.NaN, 1.0, Double.NaN, 1.0, 
1.0).toDF.groupBy("value").count.show()
+-+-+                                                                   
|value|count|
+-+-+
|  NaN|    3|
|  1.0|    3|
+-+-+


scala> Seq(Double.NaN, Double.NaN, 1.0, Double.NaN, 1.0, 
1.0).toDF.groupBy("value").pivot("value").count.show()
+-+++
|value| 1.0| NaN|
+-+++
|  NaN|null|null|
|  1.0|   3|null|
+-+++

{code}

 

It looks like the issue is that in PivotFirst if the pivotColumn is an 
AtomicType a HashMap is used, but for other types a TreeMap is used with an 
interpretedOrdering. If we made DoubleType and FloatType use the TreeMap then 
the equality checks would be correct. But I am not able to really test it 
because if I try to pivot on an array or struct I get analysis exceptions.

 

{code}
scala> Seq(Double.NaN, Double.NaN, 1.0, Double.NaN, 1.0, 
1.0).toDF.selectExpr("value", "struct(value) as 
ar_value").groupBy("value").pivot("ar_value").count.show()
java.lang.RuntimeException: Unsupported literal type class 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [1.0]
  at 
org.apache.spark.sql.errors.QueryExecutionErrors$.literalTypeUnsupportedError(QueryExecutionErrors.scala:182)
  at 
org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:101)

{code}

 
{code}
 scala> Seq(Double.NaN, Double.NaN, 1.0, Double.NaN, 1.0, 
1.0).toDF.selectExpr("value", "array(value) as 
ar_value").groupBy("value").pivot("ar_value").count.show()
org.apache.spark.sql.AnalysisException: Invalid pivot value '[1.0]': value data 
type array does not match pivot column data type array
  at 
org.apache.spark.sql.errors.QueryCompilationErrors$.pivotValDataTypeMismatchError(QueryCompilationErrors.scala:85)
  at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolvePivot$$anonfun$apply$10.$anonfun$applyOrElse$21(Analyzer.scala:762)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38955) from_csv can corrupt surrounding lines if a lineSep is in the data

2022-04-20 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17525310#comment-17525310
 ] 

Robert Joseph Evans commented on SPARK-38955:
-

Conceptually I am fine if we want to remove all line separators in from_csv. 
That is what I would expect to happen, and that is what happens with from_json.

{code}
Seq[String]("{'a': 1\n}","{'a': \n3, 'b': 'test\n3'}", "{'a'\n: 4}", 
null).toDF.select(col("value"), from_json(col("value"), 
StructType(Seq(StructField("a", LongType), StructField("b", StringType))), 
Map[String,String]("allowUnquotedControlChars" -> "true"))).show(truncate=false)
+--++
|value |from_json(value)|
+--++
|{'a': 1\n}|{1, null}   |
|{'a': \n3, 'b': 'test\n3'}|{3, test\n3}|
|{'a'\n: 4}|{4, null}   |
|null  |null|
+--++
{code}


But there is no way to turn off line separators in the CSV parser.

https://github.com/uniVocity/univocity-parsers/blob/7e7d1b3c0a3dceaed4a8413875eb1500f2a028ec/src/main/java/com/univocity/parsers/common/Format.java#L54-L65

So implementing the proposed fix may be difficult.  Replacing the default 
separator '\n' with another like '\0' might be okay, but I do know people with 
'\0' in their data so it is not truly fixing the problem.

An alternative might be to clear the state of the CSV parser after each row of 
input. i.e. read all of the tokens out of the parser after each row.  The '\n' 
is still parsed so the output of a single row is still not ideal if it has the 
line separator in it, but at least it does not corrupt the output of a good row 
after it.

> from_csv can corrupt surrounding lines if a lineSep is in the data
> --
>
> Key: SPARK-38955
> URL: https://issues.apache.org/jira/browse/SPARK-38955
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Robert Joseph Evans
>Priority: Blocker
>
> I don't know how critical this is. I was doing some general testing to 
> understand {{from_csv}} and found that if I happen to have a {{lineSep}} in 
> the input data and I noticed that the next row appears to be corrupted. 
> {{multiLine}} does not appear to fix it. Because this is data corruption I am 
> inclined to mark this as CRITICAL or BLOCKER, but it is an odd corner case so 
> I m not going to set it myself.
> {code}
> Seq[String]("1,\n2,3,4,5","6,7,8,9,10", "11,12,13,14,15", 
> null).toDF.select(col("value"), from_csv(col("value"), 
> StructType(Seq(StructField("a", LongType), StructField("b", StringType))), 
> Map[String,String]())).show()
> +--+---+
> | value|from_csv(value)|
> +--+---+
> |   1,\n2,3,4,5|  {1, null}|
> |6,7,8,9,10|  {null, 8}|
> |11,12,13,14,15|   {11, 12}|
> |  null|   null|
> +--+---+
> {code}
> {code}
> Seq[String]("1,:2,3,4,5","6,7,8,9,10", "11,12,13,14,15", 
> null).toDF.select(col("value"), from_csv(col("value"), 
> StructType(Seq(StructField("a", LongType), StructField("b", StringType))), 
> Map[String,String]("lineSep" -> ":"))).show()
> +--+---+
> | value|from_csv(value)|
> +--+---+
> |1,:2,3,4,5|  {1, null}|
> |6,7,8,9,10|  {null, 8}|
> |11,12,13,14,15|   {11, 12}|
> |  null|   null|
> +--+---+
> {code}
> {code}
> Seq[String]("1,\n2,3,4,5","6,7,8,9,10", "11,12,13,14,15", 
> null).toDF.select(col("value"), from_csv(col("value"), 
> StructType(Seq(StructField("a", LongType), StructField("b", StringType))), 
> Map[String,String]("lineSep" -> ":"))).show()
> +--+---+
> | value|from_csv(value)|
> +--+---+
> |   1,\n2,3,4,5|   {1, \n2}|
> |6,7,8,9,10| {6, 7}|
> |11,12,13,14,15|   {11, 12}|
> |  null|   null|
> +--+---+
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38955) from_csv can corrupt surrounding lines if a lineSep is in the data

2022-04-19 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-38955:
---

 Summary: from_csv can corrupt surrounding lines if a lineSep is in 
the data
 Key: SPARK-38955
 URL: https://issues.apache.org/jira/browse/SPARK-38955
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Robert Joseph Evans


I don't know how critical this is. I was doing some general testing to 
understand {{from_csv}} and found that if I happen to have a {{lineSep}} in the 
input data and I noticed that the next row appears to be corrupted. 
{{multiLine}} does not appear to fix it. Because this is data corruption I am 
inclined to mark this as CRITICAL or BLOCKER, but it is an odd corner case so I 
m not going to set it myself.

{code}
Seq[String]("1,\n2,3,4,5","6,7,8,9,10", "11,12,13,14,15", 
null).toDF.select(col("value"), from_csv(col("value"), 
StructType(Seq(StructField("a", LongType), StructField("b", StringType))), 
Map[String,String]())).show()
+--+---+
| value|from_csv(value)|
+--+---+
|   1,\n2,3,4,5|  {1, null}|
|6,7,8,9,10|  {null, 8}|
|11,12,13,14,15|   {11, 12}|
|  null|   null|
+--+---+
{code}

{code}
Seq[String]("1,:2,3,4,5","6,7,8,9,10", "11,12,13,14,15", 
null).toDF.select(col("value"), from_csv(col("value"), 
StructType(Seq(StructField("a", LongType), StructField("b", StringType))), 
Map[String,String]("lineSep" -> ":"))).show()
+--+---+
| value|from_csv(value)|
+--+---+
|1,:2,3,4,5|  {1, null}|
|6,7,8,9,10|  {null, 8}|
|11,12,13,14,15|   {11, 12}|
|  null|   null|
+--+---+
{code}

{code}
Seq[String]("1,\n2,3,4,5","6,7,8,9,10", "11,12,13,14,15", 
null).toDF.select(col("value"), from_csv(col("value"), 
StructType(Seq(StructField("a", LongType), StructField("b", StringType))), 
Map[String,String]("lineSep" -> ":"))).show()
+--+---+
| value|from_csv(value)|
+--+---+
|   1,\n2,3,4,5|   {1, \n2}|
|6,7,8,9,10| {6, 7}|
|11,12,13,14,15|   {11, 12}|
|  null|   null|
+--+---+
{code}




--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38604) ceil and floor return different types when called from scala than sql

2022-03-19 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17509265#comment-17509265
 ] 

Robert Joseph Evans commented on SPARK-38604:
-

I marked this as a critical because it technically is a breaking change for all 
but the SQL APIs. I could see blocker because it could be considered data 
corruption, but only in a really loose definition of that.  Could someone 
assign this to me? I have a patch that fixes it. It is a very simple patch. I 
don't appear to have permission to assign to myself.

> ceil and floor return different types when called from scala than sql
> -
>
> Key: SPARK-38604
> URL: https://issues.apache.org/jira/browse/SPARK-38604
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Robert Joseph Evans
>Priority: Critical
>
> In Spark 3.3.0  SPARK-37475 
> [PR|http://example.com/][https://github.com/apache/spark/pull/34729] went in 
> and added support for a scale parameter to floor and ceil.  There was 
> [discussion|https://github.com/apache/spark/pull/34729#discussion_r761157050] 
> about potential incompatibilities, specifically with respect to the return 
> types. It looks like it was 
> [decided|https://github.com/apache/spark/pull/34729#discussion_r767446855 to 
> keep the old behavior if no scale parameter was passed in, but use the new 
> functionality if a scale is passed in.
>  
> But the scala API didn't get updated to do the same thing as the SQL API.
> {code:scala}
> scala> spark.range(1).selectExpr("id", "ceil(id) as one_arg_sql", "ceil(id, 
> 0) as two_arg_sql").select(col("*"), ceil(col("id")).alias("one_arg_func"), 
> ceil(col("id"), lit(0)).alias("two_arg_func")).printSchema
> root
>  |-- id: long (nullable = false)
>  |-- one_arg_sql: long (nullable = true)
>  |-- two_arg_sql: decimal(20,0) (nullable = true)
>  |-- one_arg_func: decimal(20,0) (nullable = true)
>  |-- two_arg_func: decimal(20,0) (nullable = true)
>  
> scala> spark.range(1).selectExpr("cast(id as double) as id").selectExpr("id", 
> "ceil(id) as one_arg_sql", "ceil(id, 0) as two_arg_sql").select(col("*"), 
> ceil(col("id")).alias("one_arg_func"), ceil(col("id"), 
> lit(0)).alias("two_arg_func")).printSchema
> root
>  |-- id: double (nullable = false)
>  |-- one_arg_sql: long (nullable = true)
>  |-- two_arg_sql: decimal(30,0) (nullable = true)
>  |-- one_arg_func: decimal(30,0) (nullable = true)
>  |-- two_arg_func: decimal(30,0) (nullable = true) {code}
> And because the python code call into this too it also has the same problem. 
> I suspect that the java and R code also expose it too, but I didn't check.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38604) ceil and floor return different types when called from scala than sql

2022-03-19 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-38604:

Priority: Blocker  (was: Major)

> ceil and floor return different types when called from scala than sql
> -
>
> Key: SPARK-38604
> URL: https://issues.apache.org/jira/browse/SPARK-38604
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Robert Joseph Evans
>Priority: Blocker
>
> In Spark 3.3.0  SPARK-37475 
> [PR|http://example.com/][https://github.com/apache/spark/pull/34729] went in 
> and added support for a scale parameter to floor and ceil.  There was 
> [discussion|https://github.com/apache/spark/pull/34729#discussion_r761157050] 
> about potential incompatibilities, specifically with respect to the return 
> types. It looks like it was 
> [decided|https://github.com/apache/spark/pull/34729#discussion_r767446855 to 
> keep the old behavior if no scale parameter was passed in, but use the new 
> functionality if a scale is passed in.
>  
> But the scala API didn't get updated to do the same thing as the SQL API.
> {code:scala}
> scala> spark.range(1).selectExpr("id", "ceil(id) as one_arg_sql", "ceil(id, 
> 0) as two_arg_sql").select(col("*"), ceil(col("id")).alias("one_arg_func"), 
> ceil(col("id"), lit(0)).alias("two_arg_func")).printSchema
> root
>  |-- id: long (nullable = false)
>  |-- one_arg_sql: long (nullable = true)
>  |-- two_arg_sql: decimal(20,0) (nullable = true)
>  |-- one_arg_func: decimal(20,0) (nullable = true)
>  |-- two_arg_func: decimal(20,0) (nullable = true)
>  
> scala> spark.range(1).selectExpr("cast(id as double) as id").selectExpr("id", 
> "ceil(id) as one_arg_sql", "ceil(id, 0) as two_arg_sql").select(col("*"), 
> ceil(col("id")).alias("one_arg_func"), ceil(col("id"), 
> lit(0)).alias("two_arg_func")).printSchema
> root
>  |-- id: double (nullable = false)
>  |-- one_arg_sql: long (nullable = true)
>  |-- two_arg_sql: decimal(30,0) (nullable = true)
>  |-- one_arg_func: decimal(30,0) (nullable = true)
>  |-- two_arg_func: decimal(30,0) (nullable = true) {code}
> And because the python code call into this too it also has the same problem. 
> I suspect that the java and R code also expose it too, but I didn't check.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38604) ceil and floor return different types when called from scala than sql

2022-03-19 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-38604:

Priority: Critical  (was: Blocker)

> ceil and floor return different types when called from scala than sql
> -
>
> Key: SPARK-38604
> URL: https://issues.apache.org/jira/browse/SPARK-38604
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Robert Joseph Evans
>Priority: Critical
>
> In Spark 3.3.0  SPARK-37475 
> [PR|http://example.com/][https://github.com/apache/spark/pull/34729] went in 
> and added support for a scale parameter to floor and ceil.  There was 
> [discussion|https://github.com/apache/spark/pull/34729#discussion_r761157050] 
> about potential incompatibilities, specifically with respect to the return 
> types. It looks like it was 
> [decided|https://github.com/apache/spark/pull/34729#discussion_r767446855 to 
> keep the old behavior if no scale parameter was passed in, but use the new 
> functionality if a scale is passed in.
>  
> But the scala API didn't get updated to do the same thing as the SQL API.
> {code:scala}
> scala> spark.range(1).selectExpr("id", "ceil(id) as one_arg_sql", "ceil(id, 
> 0) as two_arg_sql").select(col("*"), ceil(col("id")).alias("one_arg_func"), 
> ceil(col("id"), lit(0)).alias("two_arg_func")).printSchema
> root
>  |-- id: long (nullable = false)
>  |-- one_arg_sql: long (nullable = true)
>  |-- two_arg_sql: decimal(20,0) (nullable = true)
>  |-- one_arg_func: decimal(20,0) (nullable = true)
>  |-- two_arg_func: decimal(20,0) (nullable = true)
>  
> scala> spark.range(1).selectExpr("cast(id as double) as id").selectExpr("id", 
> "ceil(id) as one_arg_sql", "ceil(id, 0) as two_arg_sql").select(col("*"), 
> ceil(col("id")).alias("one_arg_func"), ceil(col("id"), 
> lit(0)).alias("two_arg_func")).printSchema
> root
>  |-- id: double (nullable = false)
>  |-- one_arg_sql: long (nullable = true)
>  |-- two_arg_sql: decimal(30,0) (nullable = true)
>  |-- one_arg_func: decimal(30,0) (nullable = true)
>  |-- two_arg_func: decimal(30,0) (nullable = true) {code}
> And because the python code call into this too it also has the same problem. 
> I suspect that the java and R code also expose it too, but I didn't check.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-38604) ceil and floor return different types when called from scala than sql

2022-03-19 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-38604:
---

 Summary: ceil and floor return different types when called from 
scala than sql
 Key: SPARK-38604
 URL: https://issues.apache.org/jira/browse/SPARK-38604
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.3.0
Reporter: Robert Joseph Evans


In Spark 3.3.0  SPARK-37475 
[PR|http://example.com/][https://github.com/apache/spark/pull/34729] went in 
and added support for a scale parameter to floor and ceil.  There was 
[discussion|https://github.com/apache/spark/pull/34729#discussion_r761157050] 
about potential incompatibilities, specifically with respect to the return 
types. It looks like it was 
[decided|https://github.com/apache/spark/pull/34729#discussion_r767446855 to 
keep the old behavior if no scale parameter was passed in, but use the new 
functionality if a scale is passed in.

 

But the scala API didn't get updated to do the same thing as the SQL API.
{code:scala}
scala> spark.range(1).selectExpr("id", "ceil(id) as one_arg_sql", "ceil(id, 0) 
as two_arg_sql").select(col("*"), ceil(col("id")).alias("one_arg_func"), 
ceil(col("id"), lit(0)).alias("two_arg_func")).printSchema
root
 |-- id: long (nullable = false)
 |-- one_arg_sql: long (nullable = true)
 |-- two_arg_sql: decimal(20,0) (nullable = true)
 |-- one_arg_func: decimal(20,0) (nullable = true)
 |-- two_arg_func: decimal(20,0) (nullable = true)
 

scala> spark.range(1).selectExpr("cast(id as double) as id").selectExpr("id", 
"ceil(id) as one_arg_sql", "ceil(id, 0) as two_arg_sql").select(col("*"), 
ceil(col("id")).alias("one_arg_func"), ceil(col("id"), 
lit(0)).alias("two_arg_func")).printSchema
root
 |-- id: double (nullable = false)
 |-- one_arg_sql: long (nullable = true)
 |-- two_arg_sql: decimal(30,0) (nullable = true)
 |-- one_arg_func: decimal(30,0) (nullable = true)
 |-- two_arg_func: decimal(30,0) (nullable = true) {code}
And because the python code call into this too it also has the same problem. I 
suspect that the java and R code also expose it too, but I didn't check.

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38577) Interval types are not truncated to the expected endField when creating a DataFrame via Duration

2022-03-17 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17508269#comment-17508269
 ] 

Robert Joseph Evans commented on SPARK-38577:
-

This is especially problematic because it is really inconsistent.
{code:scala}
val data = Seq(Row(Duration.ofDays(1).plusSeconds(1)), 
Row(Duration.ofDays(2).plusMinutes(2)))

val schema = StructType(Array(StructField("dur", 
DayTimeIntervalType(DayTimeIntervalType.DAY, DayTimeIntervalType.DAY

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.selectExpr("dur", "CAST(dur AS long)", "CAST('1970-1-1' as timestamp) + dur 
as ts").show()
++---+---+
|             dur|dur|                 ts|
++---+---+
|INTERVAL '1' DAY|  1|1970-01-02 00:00:01|
|INTERVAL '2' DAY|  2|1970-01-03 00:02:00|
++---+---+
 
df.select(col("dur"), 
col("dur").cast(DayTimeIntervalType()).alias("default_dur")).show(truncate = 
false)
++---+
|dur             |default_dur                        |
++---+
|INTERVAL '1' DAY|INTERVAL '1 00:00:01' DAY TO SECOND|
|INTERVAL '2' DAY|INTERVAL '2 00:02:00' DAY TO SECOND|
++---+
{code}
Casting the values to different types will truncate it if dropping precision, 
but increasing precision or doing math with it does not.

Saving the data to parquet keeps the data exactly the same as was input, but 
doing it to CSV truncates it.
{code:scala}
df.write.parquet("./tmp")

val df2 = spark.read.parquet("./tmp")

df2.selectExpr("dur", "CAST(dur AS long)", "CAST('1970-1-1' as timestamp) + dur 
as ts").show()
++---+---+
|             dur|dur|                 ts|
++---+---+
|INTERVAL '1' DAY|  1|1970-01-02 00:00:01|
|INTERVAL '2' DAY|  2|1970-01-03 00:02:00|
++---+---+

 

df.write.csv("./tmp_csv")

val df3 = spark.read.schema(schema).csv("./tmp_csv")

df3.selectExpr("dur", "CAST(dur AS long)", "CAST('1970-1-1' as timestamp) + dur 
as ts").show()
++---+---+
|             dur|dur|                 ts|
++---+---+
|INTERVAL '2' DAY|  2|1970-01-03 00:00:00|
|INTERVAL '1' DAY|  1|1970-01-02 00:00:00|
++---+---+
 {code}
This is all also true in the python API.

 

I would expect to get an error when importing the data, or have Spark 
truncate/fix the data when it is imported so I don't get inconsistent and 
confusing results with it.

 

If this works as expected, then I would like to see it documented better what 
is happening.

> Interval types are not truncated to the expected endField when creating a 
> DataFrame via Duration
> 
>
> Key: SPARK-38577
> URL: https://issues.apache.org/jira/browse/SPARK-38577
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
> Environment: Spark 3.3.0 snapshot version
>  
>Reporter: chong
>Priority: Major
>
> *Problem:*
> ANSI interval types are store as long internally.
> The long value are not truncated to the expected endField when creating a 
> DataFrame via Duration.
>  
> *Reproduce:*
> Create a "day to day" interval, the seconds are not truncated, see below code.
> The internal long is not {*}86400 * 100{*}, but it's ({*}86400 + 1)  * 
> 100{*}{*}{{*}}
>  
> {code:java}
>   test("my test") {
> val data = Seq(Row(Duration.ofDays(1).plusSeconds(1)))
> val schema = StructType(Array(
>   StructField("t", DayTimeIntervalType(DayTimeIntervalType.DAY, 
> DayTimeIntervalType.DAY))
> ))
> val df = spark.createDataFrame(spark.sparkContext.parallelize(data), 
> schema)
> df.show()
>   } {code}
>  
>  
> After debug, the {{endField}} is always {{SECOND}} in 
> {{{}durationToMicros{}}}, see below:
>  
> {code:java}
>   // IntervalUtils class
>   def durationToMicros(duration: Duration): Long = {
> durationToMicros(duration, DT.SECOND)   // always SECOND
>   }
>   def durationToMicros(duration: Duration, endField: Byte)
> {code}
> Seems should use different endField which could be [DAY, HOUR, MINUTE, SECOND]
> Or Spark can throw an exception to avoid truncating.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37024) Even more decimal overflow issues in average

2021-10-16 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-37024:
---

 Summary: Even more decimal overflow issues in average
 Key: SPARK-37024
 URL: https://issues.apache.org/jira/browse/SPARK-37024
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Robert Joseph Evans


As a part of trying to accelerate the {{Decimal}} average aggregation on a 
[GPU|https://nvidia.github.io/spark-rapids/] I noticed a few issues around 
overflow. I think all of these can be fixed by replacing {{Average}} with 
explicit {{Sum}}, {{Count}}, and {{Divide}} operations for decimal instead of 
implicitly doing them. But the extra checks would come with a performance cost.

This is related to SPARK-35955, but goes quite a bit beyond it.
 # There are no ANSI overflow checks on the summation portion of average.
 # Nulls are inserted/overflow is detected on summation differently depending 
on code generation and parallelism.
 # If the input decimal precision is 11 or below all overflow checks are 
disabled, and the answer is wrong instead of null on overflow.

*Details:*

*there are no ANSI overflow checks on the summation portion.*
{code:scala}
scala> spark.conf.set("spark.sql.ansi.enabled", "true")

scala> spark.time(spark.range(201)
.repartition(2)
.selectExpr("id", "CAST('' AS DECIMAL(32, 
0)) as v")
.selectExpr("AVG(v)")
.show(truncate = false))
+--+
|avg(v)|
+--+
|null  |
+--+

Time taken: 622 ms

scala> spark.time(spark.range(201)
.repartition(2)
.selectExpr("id", "CAST('' AS DECIMAL(32, 
0)) as v")
.selectExpr("SUM(v)")
.show(truncate = false))
21/10/16 06:08:00 ERROR Executor: Exception in task 0.0 in stage 15.0 (TID 19)
java.lang.ArithmeticException: Overflow in sum of decimals.
...
{code}
*nulls are inserted on summation overflow differently depending on code 
generation and parallelism.*

Because there are no explicit overflow checks when doing the sum a user can get 
very inconsistent results for when a null is inserted on overflow. The checks 
really only take place when the {{Decimal}} value is converted and stored into 
an {{UnsafeRow}}.  This happens when the values are shuffled, or after each 
operation if code gen is disabled.  For a {{DECIMAL(32, 0)}} you can add 
1,000,000 max values before the summation overflows.
{code:scala}
scala> spark.time(spark.range(100)
.coalesce(2)
.selectExpr("id", "CAST('' AS DECIMAL(32, 
0)) as v")
.selectExpr("SUM(v) as s", "COUNT(v) as c", "AVG(v) as a")
.selectExpr("s", "c", "s/c as sum_div_count", "a")
.show(truncate = false))
+--+---+---+-+
|s |c  |sum_div_count   
   |a|
+--+---+---+-+
|00|100|.00|.|
+--+---+---+-+
Time taken: 241 ms

scala> spark.time(spark.range(200)
.coalesce(2)
.selectExpr("id", "CAST('' AS DECIMAL(32, 
0)) as v")
.selectExpr("SUM(v) as s", "COUNT(v) as c", "AVG(v) as a")
.selectExpr("s", "c", "s/c as sum_div_count", "a")
.show(truncate = false))
++---+-+-+
|s   |c  |sum_div_count|a|
++---+-+-+
|null|200|null |.|
++---+-+-+
Time taken: 228 ms

scala> spark.time(spark.range(300)
.coalesce(2)
.selectExpr("id", "CAST('' AS DECIMAL(32, 
0)) as v")
.selectExpr("SUM(v) as s", "COUNT(v) as c", "AVG(v) as a")
.selectExpr("s", "c", "s/c as sum_div_count", "a")
.show(truncate = false))
++---+-++
|s   |c  |sum_div_count|a   |
++---+-++
|null|300|null |null|
++---+-++
Time taken: 347 ms

scala> spark.conf.set("spark.sql.codegen.wholeStage", "false")
scala> spark.time(spark.range(101)
.coalesce(2)
.selectExpr("id", "CAST('' AS DECIMAL(32, 
0)) as v")
.selectExpr("SUM(v) as s", "COUNT(v) as c", "AVG(v) as a")
.selectExpr("s", "c", "s/c as sum_div_count", "a")
.show(truncate = false))
++---+-++
|s   |c 

[jira] [Commented] (SPARK-35563) [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows

2021-06-23 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368098#comment-17368098
 ] 

Robert Joseph Evans commented on SPARK-35563:
-

Or just do the overflow check on the int. I personally don't see a problem with 
Spark not supporting really large windows. I just want to avoid data 
corruption/loss.

> [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows
> --
>
> Key: SPARK-35563
> URL: https://issues.apache.org/jira/browse/SPARK-35563
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.2
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: data-loss
>
> I think this impacts a lot more versions of Spark, but I don't know for sure 
> because it takes a long time to test. As a part of doing corner case 
> validation testing for spark rapids I found that if a window function has 
> more than {{Int.MaxValue + 1}} rows the result is silently truncated to that 
> many rows. I have only tested this on 3.0.2 with {{row_number}}, but I 
> suspect it will impact others as well. This is a really rare corner case, but 
> because it is silent data corruption I personally think it is quite serious.
> {code:scala}
> import org.apache.spark.sql.expressions.Window
> val windowSpec = Window.partitionBy("a").orderBy("b")
> val df = spark.range(Int.MaxValue.toLong + 100).selectExpr(s"1 as a", "id as 
> b")
> spark.time(df.select(col("a"), col("b"), 
> row_number().over(windowSpec).alias("rn")).orderBy(desc("a"), 
> desc("b")).select((col("rn") < 0).alias("dir")).groupBy("dir").count.show(20))
> +-+--+
>   
> |  dir| count|
> +-+--+
> |false|2147483647|
> | true| 1|
> +-+--+
> Time taken: 1139089 ms
> Int.MaxValue.toLong + 100
> res15: Long = 2147483747
> 2147483647L + 1
> res16: Long = 2147483648
> {code}
> I had to make sure that I ran the above with at least 64GiB of heap for the 
> executor (I did it in local mode and it worked, but took forever to run)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35563) [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows

2021-06-23 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368096#comment-17368096
 ] 

Robert Joseph Evans commented on SPARK-35563:
-

Yes, technically if we switch it from an int to a long then we will have a 
similar problem with LONG_MAX.  But that kicks the can down the road a very 
long ways. With the current spark memory layout for unsafe row where there is a 
long for nullability followed by a long for each column (possibly more) for a 
single column dataframe we would need 32 exabytes of memory to hold this window 
before we hit the problem.  But yes we should look at doing an overflow check 
as well. I just would want to measure the performance impact of it so we can 
make an informed decision.

> [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows
> --
>
> Key: SPARK-35563
> URL: https://issues.apache.org/jira/browse/SPARK-35563
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.2
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: data-loss
>
> I think this impacts a lot more versions of Spark, but I don't know for sure 
> because it takes a long time to test. As a part of doing corner case 
> validation testing for spark rapids I found that if a window function has 
> more than {{Int.MaxValue + 1}} rows the result is silently truncated to that 
> many rows. I have only tested this on 3.0.2 with {{row_number}}, but I 
> suspect it will impact others as well. This is a really rare corner case, but 
> because it is silent data corruption I personally think it is quite serious.
> {code:scala}
> import org.apache.spark.sql.expressions.Window
> val windowSpec = Window.partitionBy("a").orderBy("b")
> val df = spark.range(Int.MaxValue.toLong + 100).selectExpr(s"1 as a", "id as 
> b")
> spark.time(df.select(col("a"), col("b"), 
> row_number().over(windowSpec).alias("rn")).orderBy(desc("a"), 
> desc("b")).select((col("rn") < 0).alias("dir")).groupBy("dir").count.show(20))
> +-+--+
>   
> |  dir| count|
> +-+--+
> |false|2147483647|
> | true| 1|
> +-+--+
> Time taken: 1139089 ms
> Int.MaxValue.toLong + 100
> res15: Long = 2147483747
> 2147483647L + 1
> res16: Long = 2147483648
> {code}
> I had to make sure that I ran the above with at least 64GiB of heap for the 
> executor (I did it in local mode and it worked, but took forever to run)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-16 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364334#comment-17364334
 ] 

Robert Joseph Evans edited comment on SPARK-35089 at 6/16/21, 2:54 PM:
---

{quote}I understand ordering data, but I don't see how it impact results.{quote}

OK lets change my example just a bit and completely run through your query. 
Lets say we have two tasks.

Task 1:
|Start| User | Type|
|44| Anna | TypeA|
|39 | Anna | TypeA|
|10| Joe | TypeB|

Task 2:
|Start| User | Type|
|44| Anna| TypeB|
|21| Joe | TypeB|

The first thing that spark will do is partition the data by {{User}} (which is 
what the window function asked). So each we end up with

Task 1:
|Start| User | Type|
|44| Anna | TypeA|
|39 | Anna | TypeA|
|44| Anna| TypeB|

Task 2:
|Start| User | Type|
|21| Joe | TypeB|
|10| Joe | TypeB|

Then each task will sort the data ascending by {{User, Start}} (We are going to 
ignore task 2 for now because there is no ambiguity in the sorting there, but I 
will show both options of sorting for Task 1).

Task 1 (sorting 1):
|Start| User | Type|
|39 | Anna | TypeA|
|44| Anna| TypeB|
|44| Anna | TypeA|

Task 1 (sorting 2):
|Start| User | Type|
|39 | Anna | TypeA|
|44| Anna| TypeA|
|44| Anna | TypeB|

Then the window function will run to create the end column, and the duration 
(actually 2 steps, but I'll put it into one here.

Task 1 (sorting 1):
|Start| User | Type|End| Duration|
|39 | Anna | TypeA|44| 5 |
|44| Anna| TypeB| 44| 0 |
|44| Anna | TypeA|3000| 2956 |

Task 1 (sorting 2):
|Start| User | Type|End| Duration|
|39 | Anna | TypeA|44| 5 |
|44| Anna| TypeA|44| 0 |
|44| Anna | TypeB|3000| 2956 |

Now lets filter {{type = 'TypeA' and duration > 4}}

Task 1 (sorting 1):
|Start| User | Type|End| Duration|
|39 | Anna | TypeA|44| 5 |
|44| Anna | TypeA|3000| 2956 |

Task 1 (sorting 2):
|Start| User | Type|End| Duration|
|39 | Anna | TypeA|44| 5 |

In sorting 2 we dropped 2 entries, but in sorting 1 we dropped only 1. This is 
because the order of the results in can matter if there is ambiguity in the 
ordering and the types are different within that ambiguity too. 

All {{monotonically_increasing_id}} did was make sure that only one order would 
be produced.  So is sorting 1 correct or is sorting 2 correct? From a SQL 
perspective either of them is a correct answer and spark happened to pick one 
of them. 


was (Author: revans2):
{quote}I understand ordering data, but I don't see how it impact results.{quote}

OK lets change my example just a bit and completely run through your query. 
Lets say we have two tasks.

Task 1:
|Start| User | Type|
|44| Anna | TypeA|
|39 | Anna | TypeA|
|10| Joe | TypeB|

Task 2:
|Start| User | Type|
|44| Anna| TypeB|
|21| Joe | TypeB|

The first thing that spark will do is partition the data by {{User}} (which is 
what the window function asked). So each we end up with

Task 1:
|Start| User | Type|
|44| Anna | TypeA|
|39 | Anna | TypeA|
|44| Anna| TypeB|

Task 2:
|Start| User | Type|
|21| Joe | TypeB|
|10| Joe | TypeB|

Then each task will sort the data ascending by {{User, Start}} (We are going to 
ignore task 2 for now because there is no ambiguity in the sorting there, but I 
will sow both options of sorting for Task 1.

Task 1 (sorting 1):
|Start| User | Type|
|39 | Anna | TypeA|
|44| Anna| TypeB|
|44| Anna | TypeA|

Task 1 (sorting 2):
|Start| User | Type|
|39 | Anna | TypeA|
|44| Anna| TypeA|
|44| Anna | TypeB|

Then the window function will run to create the end column, and the duration 
(actually 2 steps, but I'll put it into one here.

Task 1 (sorting 1):
|Start| User | Type|End| Duration|
|39 | Anna | TypeA|44| 5 |
|44| Anna| TypeB| 44| 0 |
|44| Anna | TypeA|3000| 2956 |

Task 1 (sorting 2):
|Start| User | Type|End|
|39 | Anna | TypeA|44| 5 |
|44| Anna| TypeA|44| 0 |
|44| Anna | TypeB|3000| 2956 |

Now lets filter {{type = 'TypeA' and duration > 4}}

Task 1 (sorting 1):
|Start| User | Type|End| Duration|
|39 | Anna | TypeA|44| 5 |
|44| Anna | TypeA|3000| 2956 |

Task 1 (sorting 2):
|Start| User | Type|End|
|39 | Anna | TypeA|44| 5 |

In sorting 2 we dropped 2 entries, but in sorting 1 we dropped only 1. This is 
because the order of the results in can matter if there is ambiguity in the 
ordering and the types are different within that ambiguity too. 

All {{monotonically_increasing_id}} did was make sure that only one order would 
be produced.  So is sorting 1 correct or is sorting 2 correct? From a SQL 
perspective either of them is a correct answer and spark happened to pick one 
of them. 

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
>   

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-16 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364334#comment-17364334
 ] 

Robert Joseph Evans commented on SPARK-35089:
-

{quote}I understand ordering data, but I don't see how it impact results.{quote}

OK lets change my example just a bit and completely run through your query. 
Lets say we have two tasks.

Task 1:
|Start| User | Type|
|44| Anna | TypeA|
|39 | Anna | TypeA|
|10| Joe | TypeB|

Task 2:
|Start| User | Type|
|44| Anna| TypeB|
|21| Joe | TypeB|

The first thing that spark will do is partition the data by {{User}} (which is 
what the window function asked). So each we end up with

Task 1:
|Start| User | Type|
|44| Anna | TypeA|
|39 | Anna | TypeA|
|44| Anna| TypeB|

Task 2:
|Start| User | Type|
|21| Joe | TypeB|
|10| Joe | TypeB|

Then each task will sort the data ascending by {{User, Start}} (We are going to 
ignore task 2 for now because there is no ambiguity in the sorting there, but I 
will sow both options of sorting for Task 1.

Task 1 (sorting 1):
|Start| User | Type|
|39 | Anna | TypeA|
|44| Anna| TypeB|
|44| Anna | TypeA|

Task 1 (sorting 2):
|Start| User | Type|
|39 | Anna | TypeA|
|44| Anna| TypeA|
|44| Anna | TypeB|

Then the window function will run to create the end column, and the duration 
(actually 2 steps, but I'll put it into one here.

Task 1 (sorting 1):
|Start| User | Type|End| Duration|
|39 | Anna | TypeA|44| 5 |
|44| Anna| TypeB| 44| 0 |
|44| Anna | TypeA|3000| 2956 |

Task 1 (sorting 2):
|Start| User | Type|End|
|39 | Anna | TypeA|44| 5 |
|44| Anna| TypeA|44| 0 |
|44| Anna | TypeB|3000| 2956 |

Now lets filter {{type = 'TypeA' and duration > 4}}

Task 1 (sorting 1):
|Start| User | Type|End| Duration|
|39 | Anna | TypeA|44| 5 |
|44| Anna | TypeA|3000| 2956 |

Task 1 (sorting 2):
|Start| User | Type|End|
|39 | Anna | TypeA|44| 5 |

In sorting 2 we dropped 2 entries, but in sorting 1 we dropped only 1. This is 
because the order of the results in can matter if there is ambiguity in the 
ordering and the types are different within that ambiguity too. 

All {{monotonically_increasing_id}} did was make sure that only one order would 
be produced.  So is sorting 1 correct or is sorting 2 correct? From a SQL 
perspective either of them is a correct answer and spark happened to pick one 
of them. 

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-16 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17364277#comment-17364277
 ] 

Robert Joseph Evans commented on SPARK-35089:
-

[~Tonzetic], I don't know what you mean by an error.  You have asked Spark to 
calculate something that has multiple "correct" answers.  Each time Spark runs 
one of those "correct" answers is selected, somewhat at random.  The 
{{monotonically_increasing_id()}} change reduces the number of "correct" 
answers to 1.

For example 

| Start | User | Type |
|40|Anna| TypeA |
|41|Anna| TypeB |
|40|Anna| TypeB |

You are asking Spark to sort the data by Start, and then do a window operation 
that depends on the order of the data.  But there are two correct answers to 
sorting the data.


| Start | User | Type |
|40|Anna| *TypeA* |
|40|Anna| *TypeB* |
|41|Anna| TypeB |


| Start | User | Type |
|40|Anna| *TypeB* |
|40|Anna| *TypeA* |
|41|Anna| TypeB |

So which of these is the "correct" way to sort the data?  Because each of these 
will produce a different answer from the window operation, and because The 
order of {{TypeA}} vs {{TypeB}} is different between the two the relative 
distance between {{start}} and {{end}} will be different (In this case 0 vs 1). 
So if you can tell me what the correct ordering should be, then I can tell you 
if adding the new id has made it correct, or if it is just consistent.

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> withColumn("user",getRandomUser()).
> withColumn("type",getRandomType()).
> drop("id")
> 

[jira] [Commented] (SPARK-35563) [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows

2021-06-14 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17362910#comment-17362910
 ] 

Robert Joseph Evans commented on SPARK-35563:
-

[~dc-heros] Thanks for looking into this. I was trying to understand how 
`row_number` being an int would cause data loss, I don't care about the 
overflow of the result for `row_number`. That does not feel like a bug to me, 
as you said. But it pointed me in the right direction.

At first I thought that there must be an issue with the buffer, but as I looked 
more closely I found a variable that does essentially the same thing as 
`row_number`.

https://github.com/apache/spark/blob/439e94c1712366ff267183d3946f2507ebf3a98e/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala#L179

It is an int and is overflowing causing the iterator to terminate the loop 
early. I think if we switch `rowNumber` to be a long the data loss problem will 
go away.  But it is not a super trivial change, because `WindowFunctionFrame` 
takes that variable as input to `write` so we need to decide if we would rather 
blow up when rowNumber overflows, or if we should change the index to be a long 
and pull on that string until everything is updated.

> [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows
> --
>
> Key: SPARK-35563
> URL: https://issues.apache.org/jira/browse/SPARK-35563
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.2
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: data-loss
>
> I think this impacts a lot more versions of Spark, but I don't know for sure 
> because it takes a long time to test. As a part of doing corner case 
> validation testing for spark rapids I found that if a window function has 
> more than {{Int.MaxValue + 1}} rows the result is silently truncated to that 
> many rows. I have only tested this on 3.0.2 with {{row_number}}, but I 
> suspect it will impact others as well. This is a really rare corner case, but 
> because it is silent data corruption I personally think it is quite serious.
> {code:scala}
> import org.apache.spark.sql.expressions.Window
> val windowSpec = Window.partitionBy("a").orderBy("b")
> val df = spark.range(Int.MaxValue.toLong + 100).selectExpr(s"1 as a", "id as 
> b")
> spark.time(df.select(col("a"), col("b"), 
> row_number().over(windowSpec).alias("rn")).orderBy(desc("a"), 
> desc("b")).select((col("rn") < 0).alias("dir")).groupBy("dir").count.show(20))
> +-+--+
>   
> |  dir| count|
> +-+--+
> |false|2147483647|
> | true| 1|
> +-+--+
> Time taken: 1139089 ms
> Int.MaxValue.toLong + 100
> res15: Long = 2147483747
> 2147483647L + 1
> res16: Long = 2147483648
> {code}
> I had to make sure that I ran the above with at least 64GiB of heap for the 
> executor (I did it in local mode and it worked, but took forever to run)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35563) [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows

2021-06-01 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-35563:

Labels: data-loss  (was: )

> [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows
> --
>
> Key: SPARK-35563
> URL: https://issues.apache.org/jira/browse/SPARK-35563
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.2
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: data-loss
>
> I think this impacts a lot more versions of Spark, but I don't know for sure 
> because it takes a long time to test. As a part of doing corner case 
> validation testing for spark rapids I found that if a window function has 
> more than {{Int.MaxValue + 1}} rows the result is silently truncated to that 
> many rows. I have only tested this on 3.0.2 with {{row_number}}, but I 
> suspect it will impact others as well. This is a really rare corner case, but 
> because it is silent data corruption I personally think it is quite serious.
> {code:scala}
> import org.apache.spark.sql.expressions.Window
> val windowSpec = Window.partitionBy("a").orderBy("b")
> val df = spark.range(Int.MaxValue.toLong + 100).selectExpr(s"1 as a", "id as 
> b")
> spark.time(df.select(col("a"), col("b"), 
> row_number().over(windowSpec).alias("rn")).orderBy(desc("a"), 
> desc("b")).select((col("rn") < 0).alias("dir")).groupBy("dir").count.show(20))
> +-+--+
>   
> |  dir| count|
> +-+--+
> |false|2147483647|
> | true| 1|
> +-+--+
> Time taken: 1139089 ms
> Int.MaxValue.toLong + 100
> res15: Long = 2147483747
> 2147483647L + 1
> res16: Long = 2147483648
> {code}
> I had to make sure that I ran the above with at least 64GiB of heap for the 
> executor (I did it in local mode and it worked, but took forever to run)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35563) [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows

2021-06-01 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-35563:

Priority: Blocker  (was: Major)

> [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows
> --
>
> Key: SPARK-35563
> URL: https://issues.apache.org/jira/browse/SPARK-35563
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.2
>Reporter: Robert Joseph Evans
>Priority: Blocker
>
> I think this impacts a lot more versions of Spark, but I don't know for sure 
> because it takes a long time to test. As a part of doing corner case 
> validation testing for spark rapids I found that if a window function has 
> more than {{Int.MaxValue + 1}} rows the result is silently truncated to that 
> many rows. I have only tested this on 3.0.2 with {{row_number}}, but I 
> suspect it will impact others as well. This is a really rare corner case, but 
> because it is silent data corruption I personally think it is quite serious.
> {code:scala}
> import org.apache.spark.sql.expressions.Window
> val windowSpec = Window.partitionBy("a").orderBy("b")
> val df = spark.range(Int.MaxValue.toLong + 100).selectExpr(s"1 as a", "id as 
> b")
> spark.time(df.select(col("a"), col("b"), 
> row_number().over(windowSpec).alias("rn")).orderBy(desc("a"), 
> desc("b")).select((col("rn") < 0).alias("dir")).groupBy("dir").count.show(20))
> +-+--+
>   
> |  dir| count|
> +-+--+
> |false|2147483647|
> | true| 1|
> +-+--+
> Time taken: 1139089 ms
> Int.MaxValue.toLong + 100
> res15: Long = 2147483747
> 2147483647L + 1
> res16: Long = 2147483648
> {code}
> I had to make sure that I ran the above with at least 64GiB of heap for the 
> executor (I did it in local mode and it worked, but took forever to run)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-01 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17355088#comment-17355088
 ] 

Robert Joseph Evans commented on SPARK-35089:
-

I should add that the above "solution" is fragile because it relies on Spark to 
keep the call to {{monotonically_increasing_id}} in the same task the reads in 
the ORC data. It really would be best if Spark could automatically insert 
something like this automatically and then drop it later before 
writing/returning results.

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> withColumn("user",getRandomUser()).
> withColumn("type",getRandomType()).
> drop("id")
> x.write.mode("append").orc("hdfs:///random.orc")
> }
> // above code should be run only once, I used a cell in Jupyter
> // define window and lead
> val w = Window.partitionBy("user").orderBy("start")
> // if null, replace with 30.000.000
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> // read data to dataframe, create stop column and calculate duration
> val fox2 = spark.read.orc("hdfs:///random.orc").
> withColumn("end", ts_lead).
> withColumn("duration", col("end")-col("start"))
> // repeated executions of this line returns different results for count 
> // I have it in separate cell in JupyterLab
> fox2.where("type='TypeA' and duration>4").count()
> {code}
> My results for three consecutive runs of last line were:
>  * run 1: 2551259
>  * run 2: 2550756
>  * run 3: 2551279
> It's very important to say that if I use filter:
> 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-06-01 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17355083#comment-17355083
 ] 

Robert Joseph Evans commented on SPARK-35089:
-

[~Tonzetic] to be clear my point was just to provide more information about the 
problem. I agree with you that this feels very much like a bug, and I would 
like to see it fixed. My hope was that with the added information someone in 
the Spark community could look at ways to fix it and at a minimum you could 
look at ways to work around it for your particular use case.  One such option 
is to remove the ambiguity by adding in a total ordering with 
{{monotonically_increasing_id}} early on in your processing (when you read the 
data in).  You should not rely on the exact value in this column (as it can 
change based off of the shape of the cluster you are running on), but you can 
use it as a part of your ordering to get unambiguous results.

For example.
 
{code:scala}
// define window and lead
val w = Window.partitionBy("user").orderBy("start", "unambiguous_id")
// if null, replace with 30.000.000
val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))

// read data to dataframe, create stop column and calculate duration
val fox2 = spark.read.orc("hdfs:///random.orc").
withColumn("unambiguous_id",  monotonically_increasing_id()).
withColumn("end", ts_lead).
withColumn("duration", col("end")-col("start"))


// repeated executions of this line returns different results for count 
// I have it in separate cell in JupyterLab
fox2.where("type='TypeA' and duration>4").count()
{code}

The above code should produce the exact same result, every time, no matter 
where it is run, or how it is run.  If you have a separate unique ID per row, 
which often exists as a primary key, you could use that instead of the 
{{monotonically_increasing_id}} to remove the ambiguity.

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = 

[jira] [Commented] (SPARK-35089) non consistent results running count for same dataset after filter and lead window function

2021-05-29 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17353778#comment-17353778
 ] 

Robert Joseph Evans commented on SPARK-35089:
-

On window functions if the {{order by}} clause is ambiguous you can get 
different results from one run to the next.  This is because the order in which 
shuffle data is read in is not deterministic, even though the sorting is.   In 
this example you are generating start with

{code}
val getRandomStart = udf((x:Int)=>{
x+scala.util.Random.nextInt(47)
})
{code}

The input to this {x} appears to be non-abiguous (0-some very large number), 
but because of the + random(0 to 47) there is the possibility of multiple start 
values being the same.

So for operations where order matters you can get ambiguous results. For lead 
and lag a different lead/lag value can show up, because the one right after 
this one {{lead(1)}} is different.  For operations like rank, dense_rank, and 
row_number the order of the values output is the same, but the rows are in a 
different order so the value at each row/rank is different. This can also 
impact operations like SUM, MIN, and MAX that use a row bounds on a windows 
instead of value ranges.  I'm not sure if this should be considered a bug or 
not.  Spark treats all window operations as deterministic, so in theory if 
there is a crash you can get inconsistent results within the same query, but 
that only happens if the end user put in a non-deterministic ordering.

> non consistent results running count for same dataset after filter and lead 
> window function
> ---
>
> Key: SPARK-35089
> URL: https://issues.apache.org/jira/browse/SPARK-35089
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.1.1
>Reporter: Domagoj
>Priority: Major
>
>    edit 2021-05-18
> I have make it  simpler to reproduce; I've put already generated data on s3 
> bucket that is publicly available with 24.000.000 records
> Now all you need to do is run this code:
> {code:java}
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql._
> import org.apache.spark.sql.functions._
> val w = Window.partitionBy("user").orderBy("start")
> val ts_lead = coalesce(lead("start", 1) .over(w), lit(3000))
> spark.read.orc("s3://dtonzetic-spark-sample-data/sample-data.orc").
>  withColumn("end", ts_lead).
>  withColumn("duration", col("end")-col("start")).
>  where("type='TypeA' and duration>4").count()
> {code}
>  
> this were my results:
>  - run 1: 2547559
>  - run 2: 2547559
>  - run 3: 2547560
>  - run 4: 2547558
>  - run 5: 2547558
>  - run 6: 2547559
>  - run 7: 2547558
> This results are from new EMR cluster, version 6.3.0, so nothing changed.
>    end edit 2021-05-18
> I have found an inconsistency with count function results after lead window 
> function and filter.
>  
> I have a dataframe (this is simplified version, but it's enough to reproduce) 
> with millions of records, with these columns:
>  * df1:
>  ** start(timestamp)
>  ** user_id(int)
>  ** type(string)
> I need to define duration between two rows, and filter on that duration and 
> type. I used window lead function to get the next event time (that define end 
> for current event), so every row now gets start and stop times. If NULL (last 
> row for example), add next midnight as stop. Data is stored in ORC file 
> (tried with Parquet format, no difference)
> This only happens with multiple cluster nodes, for example AWS EMR cluster or 
> local docker cluster setup. If I run it on single instance (local on laptop), 
> I get consistent results every time. Spark version is 3.0.1, both in AWS and 
> local and docker setup.
> Here is some simple code that you can use to reproduce it, I've used 
> jupyterLab notebook on AWS EMR. Spark version is 3.0.1.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Window
> // this dataframe generation code should be executed only once, and data have 
> to be saved, and then opened from disk, so it's always same.
> val getRandomUser = udf(()=>{
> val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
>users(scala.util.Random.nextInt(7))
> })
> val getRandomType = udf(()=>{
> val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
> types(scala.util.Random.nextInt(5))
> })
> val getRandomStart = udf((x:Int)=>{
> x+scala.util.Random.nextInt(47)
> })
> // for loop is used to avoid out of memory error during creation of dataframe
> for( a <- 0 to 23){
> // use iterator a to continue with next million, repeat 1 mil times
> val x=Range(a*100,(a*100)+100).toDF("id").
> withColumn("start",getRandomStart(col("id"))).
> 

[jira] [Updated] (SPARK-35563) [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows

2021-05-29 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-35563:

Priority: Blocker  (was: Major)

> [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows
> --
>
> Key: SPARK-35563
> URL: https://issues.apache.org/jira/browse/SPARK-35563
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.2
>Reporter: Robert Joseph Evans
>Priority: Blocker
>
> I think this impacts a lot more versions of Spark, but I don't know for sure 
> because it takes a long time to test. As a part of doing corner case 
> validation testing for spark rapids I found that if a window function has 
> more than {{Int.MaxValue + 1}} rows the result is silently truncated to that 
> many rows. I have only tested this on 3.0.2 with {{row_number}}, but I 
> suspect it will impact others as well. This is a really rare corner case, but 
> because it is silent data corruption I personally think it is quite serious.
> {code:scala}
> import org.apache.spark.sql.expressions.Window
> val windowSpec = Window.partitionBy("a").orderBy("b")
> val df = spark.range(Int.MaxValue.toLong + 100).selectExpr(s"1 as a", "id as 
> b")
> spark.time(df.select(col("a"), col("b"), 
> row_number().over(windowSpec).alias("rn")).orderBy(desc("a"), 
> desc("b")).select((col("rn") < 0).alias("dir")).groupBy("dir").count.show(20))
> +-+--+
>   
> |  dir| count|
> +-+--+
> |false|2147483647|
> | true| 1|
> +-+--+
> Time taken: 1139089 ms
> Int.MaxValue.toLong + 100
> res15: Long = 2147483747
> 2147483647L + 1
> res16: Long = 2147483648
> {code}
> I had to make sure that I ran the above with at least 64GiB of heap for the 
> executor (I did it in local mode and it worked, but took forever to run)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35563) [SQL] Window operations with over Int.MaxValue + 1 rows can silently drop rows

2021-05-29 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-35563:
---

 Summary: [SQL] Window operations with over Int.MaxValue + 1 rows 
can silently drop rows
 Key: SPARK-35563
 URL: https://issues.apache.org/jira/browse/SPARK-35563
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.2
Reporter: Robert Joseph Evans


I think this impacts a lot more versions of Spark, but I don't know for sure 
because it takes a long time to test. As a part of doing corner case validation 
testing for spark rapids I found that if a window function has more than 
{{Int.MaxValue + 1}} rows the result is silently truncated to that many rows. I 
have only tested this on 3.0.2 with {{row_number}}, but I suspect it will 
impact others as well. This is a really rare corner case, but because it is 
silent data corruption I personally think it is quite serious.
{code:scala}
import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy("a").orderBy("b")

val df = spark.range(Int.MaxValue.toLong + 100).selectExpr(s"1 as a", "id as b")

spark.time(df.select(col("a"), col("b"), 
row_number().over(windowSpec).alias("rn")).orderBy(desc("a"), 
desc("b")).select((col("rn") < 0).alias("dir")).groupBy("dir").count.show(20))

+-+--+  
|  dir| count|
+-+--+
|false|2147483647|
| true| 1|
+-+--+

Time taken: 1139089 ms

Int.MaxValue.toLong + 100
res15: Long = 2147483747

2147483647L + 1
res16: Long = 2147483648
{code}
I had to make sure that I ran the above with at least 64GiB of heap for the 
executor (I did it in local mode and it worked, but took forever to run)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-35108) Pickle produces incorrect key labels for GenericRowWithSchema (data corruption)

2021-05-04 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338948#comment-17338948
 ] 

Robert Joseph Evans edited comment on SPARK-35108 at 5/4/21, 12:34 PM:
---

Looks good.  Thanks for the fix.

feel free to close this as a dupe of the other.


was (Author: revans2):
Looks good.  Thanks for the fix.

> Pickle produces incorrect key labels for GenericRowWithSchema (data 
> corruption)
> ---
>
> Key: SPARK-35108
> URL: https://issues.apache.org/jira/browse/SPARK-35108
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.0.2
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: correctness
> Attachments: test.py, test.sh
>
>
> I think this also shows up for all versions of Spark that pickle the data 
> when doing a collect from python.
> When you do a collect in python java will do a collect and convert the 
> UnsafeRows into GenericRowWithSchema instances before it sends them to the 
> Pickler. The Pickler, by default, will try to dedupe objects using hashCode 
> and .equals for the object.  But .equals and .hashCode for 
> GenericRowWithSchema only looks at the data, not the schema. But when we 
> pickle the row the keys from the schema are written out.
> This can result in data corruption, sort of, in a few cases where a row has 
> the same number of elements as a struct within the row does, or a sub-struct 
> within another struct. 
> If the data happens to be the same, the keys for the resulting row or struct 
> can be wrong.
> My repro case is a bit convoluted, but it does happen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35108) Pickle produces incorrect key labels for GenericRowWithSchema (data corruption)

2021-05-04 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17338948#comment-17338948
 ] 

Robert Joseph Evans commented on SPARK-35108:
-

Looks good.  Thanks for the fix.

> Pickle produces incorrect key labels for GenericRowWithSchema (data 
> corruption)
> ---
>
> Key: SPARK-35108
> URL: https://issues.apache.org/jira/browse/SPARK-35108
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.0.2
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: correctness
> Attachments: test.py, test.sh
>
>
> I think this also shows up for all versions of Spark that pickle the data 
> when doing a collect from python.
> When you do a collect in python java will do a collect and convert the 
> UnsafeRows into GenericRowWithSchema instances before it sends them to the 
> Pickler. The Pickler, by default, will try to dedupe objects using hashCode 
> and .equals for the object.  But .equals and .hashCode for 
> GenericRowWithSchema only looks at the data, not the schema. But when we 
> pickle the row the keys from the schema are written out.
> This can result in data corruption, sort of, in a few cases where a row has 
> the same number of elements as a struct within the row does, or a sub-struct 
> within another struct. 
> If the data happens to be the same, the keys for the resulting row or struct 
> can be wrong.
> My repro case is a bit convoluted, but it does happen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-35108) Pickle produces incorrect key labels for GenericRowWithSchema (data corruption)

2021-04-16 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-35108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324088#comment-17324088
 ] 

Robert Joseph Evans commented on SPARK-35108:
-

If you have SPARK_HOME set when you run test.sh on a system with 6 cores or 
more it should reproduce the issue.

I was able to mitigate the issue by adding .equals and .hashCode to 
GenericRowWithSchema so it took into account the schema. But we could also try 
to turn off the dedupe or value compare dedupe (Pickler has options to disable 
these things). I am not sure what the proper fix for this would be because the 
code for all of these is shared with other code paths.

> Pickle produces incorrect key labels for GenericRowWithSchema (data 
> corruption)
> ---
>
> Key: SPARK-35108
> URL: https://issues.apache.org/jira/browse/SPARK-35108
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.0.2
>Reporter: Robert Joseph Evans
>Priority: Major
> Attachments: test.py, test.sh
>
>
> I think this also shows up for all versions of Spark that pickle the data 
> when doing a collect from python.
> When you do a collect in python java will do a collect and convert the 
> UnsafeRows into GenericRowWithSchema instances before it sends them to the 
> Pickler. The Pickler, by default, will try to dedupe objects using hashCode 
> and .equals for the object.  But .equals and .hashCode for 
> GenericRowWithSchema only looks at the data, not the schema. But when we 
> pickle the row the keys from the schema are written out.
> This can result in data corruption, sort of, in a few cases where a row has 
> the same number of elements as a struct within the row does, or a sub-struct 
> within another struct. 
> If the data happens to be the same, the keys for the resulting row or struct 
> can be wrong.
> My repro case is a bit convoluted, but it does happen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-35108) Pickle produces incorrect key labels for GenericRowWithSchema (data corruption)

2021-04-16 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-35108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-35108:

Attachment: test.sh
test.py

> Pickle produces incorrect key labels for GenericRowWithSchema (data 
> corruption)
> ---
>
> Key: SPARK-35108
> URL: https://issues.apache.org/jira/browse/SPARK-35108
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.1, 3.0.2
>Reporter: Robert Joseph Evans
>Priority: Major
> Attachments: test.py, test.sh
>
>
> I think this also shows up for all versions of Spark that pickle the data 
> when doing a collect from python.
> When you do a collect in python java will do a collect and convert the 
> UnsafeRows into GenericRowWithSchema instances before it sends them to the 
> Pickler. The Pickler, by default, will try to dedupe objects using hashCode 
> and .equals for the object.  But .equals and .hashCode for 
> GenericRowWithSchema only looks at the data, not the schema. But when we 
> pickle the row the keys from the schema are written out.
> This can result in data corruption, sort of, in a few cases where a row has 
> the same number of elements as a struct within the row does, or a sub-struct 
> within another struct. 
> If the data happens to be the same, the keys for the resulting row or struct 
> can be wrong.
> My repro case is a bit convoluted, but it does happen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-35108) Pickle produces incorrect key labels for GenericRowWithSchema (data corruption)

2021-04-16 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-35108:
---

 Summary: Pickle produces incorrect key labels for 
GenericRowWithSchema (data corruption)
 Key: SPARK-35108
 URL: https://issues.apache.org/jira/browse/SPARK-35108
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.2, 3.0.1
Reporter: Robert Joseph Evans
 Attachments: test.py, test.sh

I think this also shows up for all versions of Spark that pickle the data when 
doing a collect from python.

When you do a collect in python java will do a collect and convert the 
UnsafeRows into GenericRowWithSchema instances before it sends them to the 
Pickler. The Pickler, by default, will try to dedupe objects using hashCode and 
.equals for the object.  But .equals and .hashCode for GenericRowWithSchema 
only looks at the data, not the schema. But when we pickle the row the keys 
from the schema are written out.

This can result in data corruption, sort of, in a few cases where a row has the 
same number of elements as a struct within the row does, or a sub-struct within 
another struct. 

If the data happens to be the same, the keys for the resulting row or struct 
can be wrong.

My repro case is a bit convoluted, but it does happen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32110) -0.0 vs 0.0 is inconsistent

2020-12-10 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17247362#comment-17247362
 ] 

Robert Joseph Evans commented on SPARK-32110:
-

Thanks [~tanelk] then resolving this is fine.  I would have liked some official 
documentation of what the expected behavior is, but just having it here is OK.

> -0.0 vs 0.0 is inconsistent
> ---
>
> Key: SPARK-32110
> URL: https://issues.apache.org/jira/browse/SPARK-32110
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Assignee: Wenchen Fan
>Priority: Major
> Fix For: 3.0.2, 3.1.0
>
>
> This is related to SPARK-26021 where some things were fixed but there is 
> still a lot that is not consistent.
> When parsing SQL {{-0.0}} is turned into {{0.0}}. This can produce quick 
> results that appear to be correct but are totally inconsistent for the same 
> operators.
> {code:java}
> scala> import spark.implicits._
> import spark.implicits._
> scala> spark.sql("SELECT 0.0 = -0.0").collect
> res0: Array[org.apache.spark.sql.Row] = Array([true])
> scala> Seq((0.0, -0.0)).toDF("a", "b").selectExpr("a = b").collect
> res1: Array[org.apache.spark.sql.Row] = Array([false])
> {code}
> This also shows up in sorts
> {code:java}
> scala> Seq((0.0, -100.0), (-0.0, 100.0), (0.0, 100.0), (-0.0, 
> -100.0)).toDF("a", "b").orderBy("a", "b").collect
> res2: Array[org.apache.spark.sql.Row] = Array([-0.0,-100.0], [-0.0,100.0], 
> [0.0,-100.0], [0.0,100.0])
> {code}
> But not for a equi-join or for an aggregate
> {code:java}
> scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
> "r_b"), $"a" === $"r_a").collect
> res3: Array[org.apache.spark.sql.Row] = Array([0.0,-0.0,-0.0,0.0])
> scala> Seq((0.0, 1.0), (-0.0, 1.0)).toDF("a", "b").groupBy("a").count.collect
> res6: Array[org.apache.spark.sql.Row] = Array([0.0,2])
> {code}
> This can lead to some very odd results. Like an equi-join with a filter that 
> logically should do nothing, but ends up filtering the result to nothing.
> {code:java}
> scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
> "r_b"), $"a" === $"r_a" && $"a" <= $"r_a").collect
> res8: Array[org.apache.spark.sql.Row] = Array()
> scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
> "r_b"), $"a" === $"r_a").collect
> res9: Array[org.apache.spark.sql.Row] = Array([0.0,-0.0,-0.0,0.0])
> {code}
> Hive never normalizes -0.0 to 0.0 so this results in non-ieee complaint 
> behavior everywhere, but at least it is consistently odd.
> MySQL, Oracle, Postgres, and SQLite all appear to normalize the {{-0.0}} to 
> {{0.0}}.
> The root cause of this appears to be that the java implementation of 
> {{Double.compare}} and {{Float.compare}} for open JDK places {{-0.0}} < 
> {{0.0}}.
> This is not documented in the java docs but it is clearly documented in the 
> code, so it is not a "bug" that java is going to fix.
> [https://github.com/openjdk/jdk/blob/a0a0539b0d3f9b6809c9759e697bfafd7b138ec1/src/java.base/share/classes/java/lang/Double.java#L1022-L1035]
> It is also consistent with what is in the java docs for {{Double.equals}}
>  
> [https://docs.oracle.com/javase/8/docs/api/java/lang/Double.html#equals-java.lang.Object-]
> To be clear I am filing this mostly to document the current state rather than 
> to think it needs to be fixed ASAP. It is a rare corner case, but ended up 
> being really frustrating for me to debug what was happening.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32110) -0.0 vs 0.0 is inconsistent

2020-12-10 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17247305#comment-17247305
 ] 

Robert Joseph Evans commented on SPARK-32110:
-

I have not tried this again on the latest master branch.  I will try to find 
time today to build/rerun and see if things have changed at all.

> -0.0 vs 0.0 is inconsistent
> ---
>
> Key: SPARK-32110
> URL: https://issues.apache.org/jira/browse/SPARK-32110
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Assignee: Wenchen Fan
>Priority: Major
> Fix For: 3.0.2, 3.1.0
>
>
> This is related to SPARK-26021 where some things were fixed but there is 
> still a lot that is not consistent.
> When parsing SQL {{-0.0}} is turned into {{0.0}}. This can produce quick 
> results that appear to be correct but are totally inconsistent for the same 
> operators.
> {code:java}
> scala> import spark.implicits._
> import spark.implicits._
> scala> spark.sql("SELECT 0.0 = -0.0").collect
> res0: Array[org.apache.spark.sql.Row] = Array([true])
> scala> Seq((0.0, -0.0)).toDF("a", "b").selectExpr("a = b").collect
> res1: Array[org.apache.spark.sql.Row] = Array([false])
> {code}
> This also shows up in sorts
> {code:java}
> scala> Seq((0.0, -100.0), (-0.0, 100.0), (0.0, 100.0), (-0.0, 
> -100.0)).toDF("a", "b").orderBy("a", "b").collect
> res2: Array[org.apache.spark.sql.Row] = Array([-0.0,-100.0], [-0.0,100.0], 
> [0.0,-100.0], [0.0,100.0])
> {code}
> But not for a equi-join or for an aggregate
> {code:java}
> scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
> "r_b"), $"a" === $"r_a").collect
> res3: Array[org.apache.spark.sql.Row] = Array([0.0,-0.0,-0.0,0.0])
> scala> Seq((0.0, 1.0), (-0.0, 1.0)).toDF("a", "b").groupBy("a").count.collect
> res6: Array[org.apache.spark.sql.Row] = Array([0.0,2])
> {code}
> This can lead to some very odd results. Like an equi-join with a filter that 
> logically should do nothing, but ends up filtering the result to nothing.
> {code:java}
> scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
> "r_b"), $"a" === $"r_a" && $"a" <= $"r_a").collect
> res8: Array[org.apache.spark.sql.Row] = Array()
> scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
> "r_b"), $"a" === $"r_a").collect
> res9: Array[org.apache.spark.sql.Row] = Array([0.0,-0.0,-0.0,0.0])
> {code}
> Hive never normalizes -0.0 to 0.0 so this results in non-ieee complaint 
> behavior everywhere, but at least it is consistently odd.
> MySQL, Oracle, Postgres, and SQLite all appear to normalize the {{-0.0}} to 
> {{0.0}}.
> The root cause of this appears to be that the java implementation of 
> {{Double.compare}} and {{Float.compare}} for open JDK places {{-0.0}} < 
> {{0.0}}.
> This is not documented in the java docs but it is clearly documented in the 
> code, so it is not a "bug" that java is going to fix.
> [https://github.com/openjdk/jdk/blob/a0a0539b0d3f9b6809c9759e697bfafd7b138ec1/src/java.base/share/classes/java/lang/Double.java#L1022-L1035]
> It is also consistent with what is in the java docs for {{Double.equals}}
>  
> [https://docs.oracle.com/javase/8/docs/api/java/lang/Double.html#equals-java.lang.Object-]
> To be clear I am filing this mostly to document the current state rather than 
> to think it needs to be fixed ASAP. It is a rare corner case, but ended up 
> being really frustrating for me to debug what was happening.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32110) -0.0 vs 0.0 is inconsistent

2020-12-09 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17246585#comment-17246585
 ] 

Robert Joseph Evans commented on SPARK-32110:
-

Are we sure that this issue should be closed?

The PR https://github.com/apache/spark/pull/30673 fixed the issue with HLL++

I don't think the issues with equals and null safe equals described 
[here|https://issues.apache.org/jira/browse/SPARK-32110?focusedCommentId=17183107=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17183107]
 were fixed, but I am happy to be wrong.

Also nothing was done to address the odd behavior I called out initially (I was 
hoping to at least have it documented somewhere so users can know what the 
expected operation is, instead of being surprised)

> -0.0 vs 0.0 is inconsistent
> ---
>
> Key: SPARK-32110
> URL: https://issues.apache.org/jira/browse/SPARK-32110
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Assignee: Wenchen Fan
>Priority: Major
> Fix For: 3.0.2, 3.1.0
>
>
> This is related to SPARK-26021 where some things were fixed but there is 
> still a lot that is not consistent.
> When parsing SQL {{-0.0}} is turned into {{0.0}}. This can produce quick 
> results that appear to be correct but are totally inconsistent for the same 
> operators.
> {code:java}
> scala> import spark.implicits._
> import spark.implicits._
> scala> spark.sql("SELECT 0.0 = -0.0").collect
> res0: Array[org.apache.spark.sql.Row] = Array([true])
> scala> Seq((0.0, -0.0)).toDF("a", "b").selectExpr("a = b").collect
> res1: Array[org.apache.spark.sql.Row] = Array([false])
> {code}
> This also shows up in sorts
> {code:java}
> scala> Seq((0.0, -100.0), (-0.0, 100.0), (0.0, 100.0), (-0.0, 
> -100.0)).toDF("a", "b").orderBy("a", "b").collect
> res2: Array[org.apache.spark.sql.Row] = Array([-0.0,-100.0], [-0.0,100.0], 
> [0.0,-100.0], [0.0,100.0])
> {code}
> But not for a equi-join or for an aggregate
> {code:java}
> scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
> "r_b"), $"a" === $"r_a").collect
> res3: Array[org.apache.spark.sql.Row] = Array([0.0,-0.0,-0.0,0.0])
> scala> Seq((0.0, 1.0), (-0.0, 1.0)).toDF("a", "b").groupBy("a").count.collect
> res6: Array[org.apache.spark.sql.Row] = Array([0.0,2])
> {code}
> This can lead to some very odd results. Like an equi-join with a filter that 
> logically should do nothing, but ends up filtering the result to nothing.
> {code:java}
> scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
> "r_b"), $"a" === $"r_a" && $"a" <= $"r_a").collect
> res8: Array[org.apache.spark.sql.Row] = Array()
> scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
> "r_b"), $"a" === $"r_a").collect
> res9: Array[org.apache.spark.sql.Row] = Array([0.0,-0.0,-0.0,0.0])
> {code}
> Hive never normalizes -0.0 to 0.0 so this results in non-ieee complaint 
> behavior everywhere, but at least it is consistently odd.
> MySQL, Oracle, Postgres, and SQLite all appear to normalize the {{-0.0}} to 
> {{0.0}}.
> The root cause of this appears to be that the java implementation of 
> {{Double.compare}} and {{Float.compare}} for open JDK places {{-0.0}} < 
> {{0.0}}.
> This is not documented in the java docs but it is clearly documented in the 
> code, so it is not a "bug" that java is going to fix.
> [https://github.com/openjdk/jdk/blob/a0a0539b0d3f9b6809c9759e697bfafd7b138ec1/src/java.base/share/classes/java/lang/Double.java#L1022-L1035]
> It is also consistent with what is in the java docs for {{Double.equals}}
>  
> [https://docs.oracle.com/javase/8/docs/api/java/lang/Double.html#equals-java.lang.Object-]
> To be clear I am filing this mostly to document the current state rather than 
> to think it needs to be fixed ASAP. It is a rare corner case, but ended up 
> being really frustrating for me to debug what was happening.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32672) Data corruption in some cached compressed boolean columns

2020-08-22 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17182345#comment-17182345
 ] 

Robert Joseph Evans commented on SPARK-32672:
-

Honestly, it is not a big deal what happened.  I have worked on enough 
open-source projects that I know that all of this is best-effort run by 
volunteers.  Plus my involvement in the Spark project has not been frequent 
enough for a lot of people to know that I am a PMC member and honestly I have 
not been involved enough lately to know the process myself.  So I am happy to 
have people correct me or treat me like I am a contributor instead. The 
important thing is that we fixed the bug, and it should start to roll out soon.

> Data corruption in some cached compressed boolean columns
> -
>
> Key: SPARK-32672
> URL: https://issues.apache.org/jira/browse/SPARK-32672
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.4, 2.4.6, 3.0.0, 3.0.1, 3.1.0
>Reporter: Robert Joseph Evans
>Assignee: Robert Joseph Evans
>Priority: Blocker
>  Labels: correctness
> Fix For: 2.4.7, 3.0.1, 3.1.0
>
> Attachments: bad_order.snappy.parquet, small_bad.snappy.parquet
>
>
> I found that when sorting some boolean data into the cache that the results 
> can change when the data is read back out.
> It needs to be a non-trivial amount of data, and it is highly dependent on 
> the order of the data.  If I disable compression in the cache the issue goes 
> away.  I was able to make this happen in 3.0.0.  I am going to try and 
> reproduce it in other versions too.
> I'll attach the parquet file with boolean data in an order that causes this 
> to happen. As you can see after the data is cached a single null values 
> switches over to be false.
> {code}
> scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
> bad_order: org.apache.spark.sql.DataFrame = [b: boolean]  
>   
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7153|
> | true|54334|
> |false|54021|
> +-+-+
> scala> bad_order.cache()
> res1: bad_order.type = [b: boolean]
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7152|
> | true|54334|
> |false|54022|
> +-+-+
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32672) Data corruption in some cached compressed boolean columns

2020-08-21 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17181873#comment-17181873
 ] 

Robert Joseph Evans commented on SPARK-32672:
-

OK reading through the code I understand what is happening now.  The 
compression format ignores nulls, which are stored separately.  As such the bit 
set stored is only for non-null boolean values/bits. The number of entries 
stored in the compression format is  the number of non-null boolean values that 
are stored.

So the stopping condition on a batch decompress.

{code}
while (visitedLocal < countLocal) {
{code}

skips all of null values at the end.  But because the length of the column is 
known ahead of time it falls back to the default value which is false.

I'll try to get a patch up shortly to fix this.

> Data corruption in some cached compressed boolean columns
> -
>
> Key: SPARK-32672
> URL: https://issues.apache.org/jira/browse/SPARK-32672
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0, 3.0.1, 3.1.0
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: correctness
> Attachments: bad_order.snappy.parquet, small_bad.snappy.parquet
>
>
> I found that when sorting some boolean data into the cache that the results 
> can change when the data is read back out.
> It needs to be a non-trivial amount of data, and it is highly dependent on 
> the order of the data.  If I disable compression in the cache the issue goes 
> away.  I was able to make this happen in 3.0.0.  I am going to try and 
> reproduce it in other versions too.
> I'll attach the parquet file with boolean data in an order that causes this 
> to happen. As you can see after the data is cached a single null values 
> switches over to be false.
> {code}
> scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
> bad_order: org.apache.spark.sql.DataFrame = [b: boolean]  
>   
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7153|
> | true|54334|
> |false|54021|
> +-+-+
> scala> bad_order.cache()
> res1: bad_order.type = [b: boolean]
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7152|
> | true|54334|
> |false|54022|
> +-+-+
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32672) Data corruption in some cached compressed boolean columns

2020-08-21 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-32672:

Attachment: small_bad.snappy.parquet

> Data corruption in some cached compressed boolean columns
> -
>
> Key: SPARK-32672
> URL: https://issues.apache.org/jira/browse/SPARK-32672
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0, 3.0.1, 3.1.0
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: correctness
> Attachments: bad_order.snappy.parquet, small_bad.snappy.parquet
>
>
> I found that when sorting some boolean data into the cache that the results 
> can change when the data is read back out.
> It needs to be a non-trivial amount of data, and it is highly dependent on 
> the order of the data.  If I disable compression in the cache the issue goes 
> away.  I was able to make this happen in 3.0.0.  I am going to try and 
> reproduce it in other versions too.
> I'll attach the parquet file with boolean data in an order that causes this 
> to happen. As you can see after the data is cached a single null values 
> switches over to be false.
> {code}
> scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
> bad_order: org.apache.spark.sql.DataFrame = [b: boolean]  
>   
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7153|
> | true|54334|
> |false|54021|
> +-+-+
> scala> bad_order.cache()
> res1: bad_order.type = [b: boolean]
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7152|
> | true|54334|
> |false|54022|
> +-+-+
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32672) Data corruption in some cached compressed boolean columns

2020-08-21 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17181868#comment-17181868
 ] 

Robert Joseph Evans commented on SPARK-32672:
-

So I am able to reduce the corruption down to just a single 10,000 row chunk, 
and still get it to happen. I'll post a new parquet file soon that will 
hopefully make debugging a little simpler.

{code}
scala> val bad_order = 
spark.read.parquet("/home/roberte/src/rapids-plugin-4-spark/integration_tests/bad_order.snappy.parquet").selectExpr("b",
 "monotonically_increasing_id() as id").where(col("id")>=7 and col("id") < 
8)
bad_order: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [b: 
boolean, id: bigint]

scala> bad_order.groupBy("b").count.show
+-+-+
|b|count|
+-+-+
| null|  619|
| true| 4701|
|false| 4680|
+-+-+


scala> bad_order.cache()
res2: bad_order.type = [b: boolean, id: bigint]

scala> bad_order.groupBy("b").count.show
+-+-+
|b|count|
+-+-+
| null|  618|
| true| 4701|
|false| 4681|
+-+-+

{code}

> Data corruption in some cached compressed boolean columns
> -
>
> Key: SPARK-32672
> URL: https://issues.apache.org/jira/browse/SPARK-32672
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0, 3.0.1, 3.1.0
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: correctness
> Attachments: bad_order.snappy.parquet
>
>
> I found that when sorting some boolean data into the cache that the results 
> can change when the data is read back out.
> It needs to be a non-trivial amount of data, and it is highly dependent on 
> the order of the data.  If I disable compression in the cache the issue goes 
> away.  I was able to make this happen in 3.0.0.  I am going to try and 
> reproduce it in other versions too.
> I'll attach the parquet file with boolean data in an order that causes this 
> to happen. As you can see after the data is cached a single null values 
> switches over to be false.
> {code}
> scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
> bad_order: org.apache.spark.sql.DataFrame = [b: boolean]  
>   
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7153|
> | true|54334|
> |false|54021|
> +-+-+
> scala> bad_order.cache()
> res1: bad_order.type = [b: boolean]
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7152|
> | true|54334|
> |false|54022|
> +-+-+
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32672) Data corruption in some cached compressed boolean columns

2020-08-20 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-32672:

Affects Version/s: 3.1.0

> Data corruption in some cached compressed boolean columns
> -
>
> Key: SPARK-32672
> URL: https://issues.apache.org/jira/browse/SPARK-32672
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0, 3.0.1, 3.1.0
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: correctness
> Attachments: bad_order.snappy.parquet
>
>
> I found that when sorting some boolean data into the cache that the results 
> can change when the data is read back out.
> It needs to be a non-trivial amount of data, and it is highly dependent on 
> the order of the data.  If I disable compression in the cache the issue goes 
> away.  I was able to make this happen in 3.0.0.  I am going to try and 
> reproduce it in other versions too.
> I'll attach the parquet file with boolean data in an order that causes this 
> to happen. As you can see after the data is cached a single null values 
> switches over to be false.
> {code}
> scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
> bad_order: org.apache.spark.sql.DataFrame = [b: boolean]  
>   
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7153|
> | true|54334|
> |false|54021|
> +-+-+
> scala> bad_order.cache()
> res1: bad_order.type = [b: boolean]
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7152|
> | true|54334|
> |false|54022|
> +-+-+
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32672) Data corruption in some cached compressed boolean columns

2020-08-20 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17181486#comment-17181486
 ] 

Robert Joseph Evans commented on SPARK-32672:
-

I added some debugging to the compression code and it looks like in the 8th 
CompressedBatch of 10,000 entries the number of nulls seen was different from 
the number expected.

619 expected and 618 seen.  I'll try to debug this a bit more tomorrow.

> Data corruption in some cached compressed boolean columns
> -
>
> Key: SPARK-32672
> URL: https://issues.apache.org/jira/browse/SPARK-32672
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0, 3.0.1
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: correctness
> Attachments: bad_order.snappy.parquet
>
>
> I found that when sorting some boolean data into the cache that the results 
> can change when the data is read back out.
> It needs to be a non-trivial amount of data, and it is highly dependent on 
> the order of the data.  If I disable compression in the cache the issue goes 
> away.  I was able to make this happen in 3.0.0.  I am going to try and 
> reproduce it in other versions too.
> I'll attach the parquet file with boolean data in an order that causes this 
> to happen. As you can see after the data is cached a single null values 
> switches over to be false.
> {code}
> scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
> bad_order: org.apache.spark.sql.DataFrame = [b: boolean]  
>   
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7153|
> | true|54334|
> |false|54021|
> +-+-+
> scala> bad_order.cache()
> res1: bad_order.type = [b: boolean]
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7152|
> | true|54334|
> |false|54022|
> +-+-+
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32672) Data corruption in some cached compressed boolean columns

2020-08-20 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17181478#comment-17181478
 ] 

Robert Joseph Evans commented on SPARK-32672:
-

I did a little debugging and found that `BooleanBitSet$Encoder` is being used 
for compression.  There are other data orderings that use the same encoder and 
produce correct results though.

> Data corruption in some cached compressed boolean columns
> -
>
> Key: SPARK-32672
> URL: https://issues.apache.org/jira/browse/SPARK-32672
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0, 3.0.1
>Reporter: Robert Joseph Evans
>Priority: Blocker
>  Labels: correctness
> Attachments: bad_order.snappy.parquet
>
>
> I found that when sorting some boolean data into the cache that the results 
> can change when the data is read back out.
> It needs to be a non-trivial amount of data, and it is highly dependent on 
> the order of the data.  If I disable compression in the cache the issue goes 
> away.  I was able to make this happen in 3.0.0.  I am going to try and 
> reproduce it in other versions too.
> I'll attach the parquet file with boolean data in an order that causes this 
> to happen. As you can see after the data is cached a single null values 
> switches over to be false.
> {code}
> scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
> bad_order: org.apache.spark.sql.DataFrame = [b: boolean]  
>   
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7153|
> | true|54334|
> |false|54021|
> +-+-+
> scala> bad_order.cache()
> res1: bad_order.type = [b: boolean]
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7152|
> | true|54334|
> |false|54022|
> +-+-+
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32672) Data corruption in some cached compressed boolean columns

2020-08-20 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17181466#comment-17181466
 ] 

Robert Joseph Evans commented on SPARK-32672:
-

I verified that this is still happening on 3.1.0-SNAPSHOT  too

> Data corruption in some cached compressed boolean columns
> -
>
> Key: SPARK-32672
> URL: https://issues.apache.org/jira/browse/SPARK-32672
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Blocker
> Attachments: bad_order.snappy.parquet
>
>
> I found that when sorting some boolean data into the cache that the results 
> can change when the data is read back out.
> It needs to be a non-trivial amount of data, and it is highly dependent on 
> the order of the data.  If I disable compression in the cache the issue goes 
> away.  I was able to make this happen in 3.0.0.  I am going to try and 
> reproduce it in other versions too.
> I'll attach the parquet file with boolean data in an order that causes this 
> to happen. As you can see after the data is cached a single null values 
> switches over to be false.
> {code}
> scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
> bad_order: org.apache.spark.sql.DataFrame = [b: boolean]  
>   
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7153|
> | true|54334|
> |false|54021|
> +-+-+
> scala> bad_order.cache()
> res1: bad_order.type = [b: boolean]
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7152|
> | true|54334|
> |false|54022|
> +-+-+
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32672) Data corruption in some cached compressed boolean columns

2020-08-20 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17181459#comment-17181459
 ] 

Robert Joseph Evans commented on SPARK-32672:
-

I verified that this is still happening on 3.0.2-SNAPSHOT

> Data corruption in some cached compressed boolean columns
> -
>
> Key: SPARK-32672
> URL: https://issues.apache.org/jira/browse/SPARK-32672
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Blocker
> Attachments: bad_order.snappy.parquet
>
>
> I found that when sorting some boolean data into the cache that the results 
> can change when the data is read back out.
> It needs to be a non-trivial amount of data, and it is highly dependent on 
> the order of the data.  If I disable compression in the cache the issue goes 
> away.  I was able to make this happen in 3.0.0.  I am going to try and 
> reproduce it in other versions too.
> I'll attach the parquet file with boolean data in an order that causes this 
> to happen. As you can see after the data is cached a single null values 
> switches over to be false.
> {code}
> scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
> bad_order: org.apache.spark.sql.DataFrame = [b: boolean]  
>   
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7153|
> | true|54334|
> |false|54021|
> +-+-+
> scala> bad_order.cache()
> res1: bad_order.type = [b: boolean]
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7152|
> | true|54334|
> |false|54022|
> +-+-+
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32672) Data corruption in some cached compressed boolean columns

2020-08-20 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-32672:

Affects Version/s: 2.4.6

> Data corruption in some cached compressed boolean columns
> -
>
> Key: SPARK-32672
> URL: https://issues.apache.org/jira/browse/SPARK-32672
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Blocker
> Attachments: bad_order.snappy.parquet
>
>
> I found that when sorting some boolean data into the cache that the results 
> can change when the data is read back out.
> It needs to be a non-trivial amount of data, and it is highly dependent on 
> the order of the data.  If I disable compression in the cache the issue goes 
> away.  I was able to make this happen in 3.0.0.  I am going to try and 
> reproduce it in other versions too.
> I'll attach the parquet file with boolean data in an order that causes this 
> to happen. As you can see after the data is cached a single null values 
> switches over to be false.
> {code}
> scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
> bad_order: org.apache.spark.sql.DataFrame = [b: boolean]  
>   
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7153|
> | true|54334|
> |false|54021|
> +-+-+
> scala> bad_order.cache()
> res1: bad_order.type = [b: boolean]
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7152|
> | true|54334|
> |false|54022|
> +-+-+
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32672) Data corruption in some cached compressed boolean columns

2020-08-20 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-32672:

Summary: Data corruption in some cached compressed boolean columns  (was: 
Daat corruption in some cached compressed boolean columns)

> Data corruption in some cached compressed boolean columns
> -
>
> Key: SPARK-32672
> URL: https://issues.apache.org/jira/browse/SPARK-32672
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Blocker
> Attachments: bad_order.snappy.parquet
>
>
> I found that when sorting some boolean data into the cache that the results 
> can change when the data is read back out.
> It needs to be a non-trivial amount of data, and it is highly dependent on 
> the order of the data.  If I disable compression in the cache the issue goes 
> away.  I was able to make this happen in 3.0.0.  I am going to try and 
> reproduce it in other versions too.
> I'll attach the parquet file with boolean data in an order that causes this 
> to happen. As you can see after the data is cached a single null values 
> switches over to be false.
> {code}
> scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
> bad_order: org.apache.spark.sql.DataFrame = [b: boolean]  
>   
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7153|
> | true|54334|
> |false|54021|
> +-+-+
> scala> bad_order.cache()
> res1: bad_order.type = [b: boolean]
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7152|
> | true|54334|
> |false|54022|
> +-+-+
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32672) Data corruption in some cached compressed boolean columns

2020-08-20 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-32672:

Attachment: bad_order.snappy.parquet

> Data corruption in some cached compressed boolean columns
> -
>
> Key: SPARK-32672
> URL: https://issues.apache.org/jira/browse/SPARK-32672
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Blocker
> Attachments: bad_order.snappy.parquet
>
>
> I found that when sorting some boolean data into the cache that the results 
> can change when the data is read back out.
> It needs to be a non-trivial amount of data, and it is highly dependent on 
> the order of the data.  If I disable compression in the cache the issue goes 
> away.  I was able to make this happen in 3.0.0.  I am going to try and 
> reproduce it in other versions too.
> I'll attach the parquet file with boolean data in an order that causes this 
> to happen. As you can see after the data is cached a single null values 
> switches over to be false.
> {code}
> scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
> bad_order: org.apache.spark.sql.DataFrame = [b: boolean]  
>   
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7153|
> | true|54334|
> |false|54021|
> +-+-+
> scala> bad_order.cache()
> res1: bad_order.type = [b: boolean]
> scala> bad_order.groupBy("b").count.show
> +-+-+
> |b|count|
> +-+-+
> | null| 7152|
> | true|54334|
> |false|54022|
> +-+-+
> scala> 
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32672) Daat corruption in some cached compressed boolean columns

2020-08-20 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-32672:
---

 Summary: Daat corruption in some cached compressed boolean columns
 Key: SPARK-32672
 URL: https://issues.apache.org/jira/browse/SPARK-32672
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Robert Joseph Evans
 Attachments: bad_order.snappy.parquet

I found that when sorting some boolean data into the cache that the results can 
change when the data is read back out.

It needs to be a non-trivial amount of data, and it is highly dependent on the 
order of the data.  If I disable compression in the cache the issue goes away.  
I was able to make this happen in 3.0.0.  I am going to try and reproduce it in 
other versions too.

I'll attach the parquet file with boolean data in an order that causes this to 
happen. As you can see after the data is cached a single null values switches 
over to be false.

{code}
scala> val bad_order = spark.read.parquet("./bad_order.snappy.parquet")
bad_order: org.apache.spark.sql.DataFrame = [b: boolean]

scala> bad_order.groupBy("b").count.show
+-+-+
|b|count|
+-+-+
| null| 7153|
| true|54334|
|false|54021|
+-+-+


scala> bad_order.cache()
res1: bad_order.type = [b: boolean]

scala> bad_order.groupBy("b").count.show
+-+-+
|b|count|
+-+-+
| null| 7152|
| true|54334|
|false|54022|
+-+-+


scala> 

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32612) int columns produce inconsistent results on pandas UDFs

2020-08-17 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17178927#comment-17178927
 ] 

Robert Joseph Evans commented on SPARK-32612:
-

This is just one example that shows what can happen. Yes, if I am getting 
overflows arguably a non-consistent floating-point cast is not my biggest 
problem. I used it simply to better show the difference in the result.  There 
are other similar problems that can show up but are much more subtle.  A double 
has 52 bits in the fraction section of the format.  The format plays games with 
that but any long value that uses more than 53 bits will produce different 
results if it is first cast to a float for processing.

Is this a serious problem? I'm not totally sure, which is why I left it as the 
default priority. It violates the principle of least surprise. As a developer, 
I would not expect this to happen.  Is it likely to be something that someone 
sees in production? I doubt it. The only reason I found this was because I was 
testing with random numbers to try and validate changes I was making as a part 
of https://github.com/NVIDIA/spark-rapids. I mostly wanted to be sure that this 
was documented. I'll leave it up to the community if this is something that is 
important enough to fix.

> int columns produce inconsistent results on pandas UDFs
> ---
>
> Key: SPARK-32612
> URL: https://issues.apache.org/jira/browse/SPARK-32612
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> This is similar to SPARK-30187 but I personally consider this data corruption.
> If I have a simple pandas UDF
> {code}
>  >>> def add(a, b):
> return a + b
>  >>> my_udf = pandas_udf(add, returnType=LongType())
> {code}
> And I want to process some data with it, say 32 bit ints
> {code}
> >>> df = spark.createDataFrame([(1037694399, 1204615848),(3,4)], 
> >>> StructType([StructField("a", IntegerType()), StructField("b", 
> >>> IntegerType())]))
> >>> df.select(my_udf(col("a") - 3, col("b")).show()
> +--+--+---+
> | a| b|add((a - 3), b)|
> +--+--+---+
> |1037694399|1204615848|-2052657052|
> | 3| 4|  4|
> +--+--+---+
> {code}
> I get an integer overflow for the data as I would expect.  But as soon as I 
> add a {{None}} to the data, even on a different row the result I get back is 
> totally different.
> {code}
> >>> df = spark.createDataFrame([(1037694399, 1204615848),(3,None)], 
> >>> StructType([StructField("a", IntegerType()), StructField("b", 
> >>> IntegerType())]))
> >>> df.select(col("a"), col("b"), my_udf(col("a") - 3, col("b"))).show()
> +--+--+---+
> | a| b|add((a - 3), b)|
> +--+--+---+
> |1037694399|1204615848| 2242310244|
> | 3|  null|   null|
> +--+--+---+
> {code}
> The integer overflow disappears.  This is because arrow and/or pandas changes 
> the data type to a float in order to be able to store the null value.  So 
> then the processing is being done on floating point there is no overflow.  
> This in and of itself is annoying but understandable because it is dealing 
> with a limitation in pandas. 
> Where it becomes a bug is that this happens on a per batch basis.  This means 
> that I can have the same two rows in different parts of my data set and get 
> different results depending on their proximity to a null value.
> {code}
> >>> df = spark.createDataFrame([(1037694399, 
> >>> 1204615848),(3,None),(1037694399, 1204615848),(3,4)], 
> >>> StructType([StructField("a", IntegerType()), StructField("b", 
> >>> IntegerType())]))
> >>> df.select(col("a"), col("b"), my_udf(col("a") - 3, col("b"))).show()
> +--+--+---+
> | a| b|add((a - 3), b)|
> +--+--+---+
> |1037694399|1204615848| 2242310244|
> | 3|  null|   null|
> |1037694399|1204615848| 2242310244|
> | 3| 4|  4|
> +--+--+---+
> >>> spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '2')
> >>> df.select(col("a"), col("b"), my_udf(col("a") - 3, col("b"))).show()
> +--+--+---+
> | a| b|add((a - 3), b)|
> +--+--+---+
> |1037694399|1204615848| 2242310244|
> | 3|  null|   null|
> |1037694399|1204615848|-2052657052|
> | 3| 4|  4|
> +--+--+---+
> {code}
> For me personally I would prefer to have all nullable integer 

[jira] [Created] (SPARK-32612) int columns produce inconsistent results on pandas UDFs

2020-08-13 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-32612:
---

 Summary: int columns produce inconsistent results on pandas UDFs
 Key: SPARK-32612
 URL: https://issues.apache.org/jira/browse/SPARK-32612
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 3.0.0
Reporter: Robert Joseph Evans


This is similar to SPARK-30187 but I personally consider this data corruption.

If I have a simple pandas UDF
{code}
 >>> def add(a, b):
return a + b
 >>> my_udf = pandas_udf(add, returnType=LongType())
{code}

And I want to process some data with it, say 32 bit ints
{code}
>>> df = spark.createDataFrame([(1037694399, 1204615848),(3,4)], 
>>> StructType([StructField("a", IntegerType()), StructField("b", 
>>> IntegerType())]))

>>> df.select(my_udf(col("a") - 3, col("b")).show()
+--+--+---+
| a| b|add((a - 3), b)|
+--+--+---+
|1037694399|1204615848|-2052657052|
| 3| 4|  4|
+--+--+---+
{code}

I get an integer overflow for the data as I would expect.  But as soon as I add 
a {{None}} to the data, even on a different row the result I get back is 
totally different.

{code}
>>> df = spark.createDataFrame([(1037694399, 1204615848),(3,None)], 
>>> StructType([StructField("a", IntegerType()), StructField("b", 
>>> IntegerType())]))

>>> df.select(col("a"), col("b"), my_udf(col("a") - 3, col("b"))).show()
+--+--+---+
| a| b|add((a - 3), b)|
+--+--+---+
|1037694399|1204615848| 2242310244|
| 3|  null|   null|
+--+--+---+
{code}

The integer overflow disappears.  This is because arrow and/or pandas changes 
the data type to a float in order to be able to store the null value.  So then 
the processing is being done on floating point there is no overflow.  This in 
and of itself is annoying but understandable because it is dealing with a 
limitation in pandas. 

Where it becomes a bug is that this happens on a per batch basis.  This means 
that I can have the same two rows in different parts of my data set and get 
different results depending on their proximity to a null value.

{code}
>>> df = spark.createDataFrame([(1037694399, 1204615848),(3,None),(1037694399, 
>>> 1204615848),(3,4)], StructType([StructField("a", IntegerType()), 
>>> StructField("b", IntegerType())]))
>>> df.select(col("a"), col("b"), my_udf(col("a") - 3, col("b"))).show()
+--+--+---+
| a| b|add((a - 3), b)|
+--+--+---+
|1037694399|1204615848| 2242310244|
| 3|  null|   null|
|1037694399|1204615848| 2242310244|
| 3| 4|  4|
+--+--+---+

>>> spark.conf.set('spark.sql.execution.arrow.maxRecordsPerBatch', '2')
>>> df.select(col("a"), col("b"), my_udf(col("a") - 3, col("b"))).show()
+--+--+---+
| a| b|add((a - 3), b)|
+--+--+---+
|1037694399|1204615848| 2242310244|
| 3|  null|   null|
|1037694399|1204615848|-2052657052|
| 3| 4|  4|
+--+--+---+

{code}

For me personally I would prefer to have all nullable integer columns upgraded 
to float all the time, that way it is at least consistent.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32334) Investigate commonizing Columnar and Row data transformations

2020-07-23 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17163576#comment-17163576
 ] 

Robert Joseph Evans commented on SPARK-32334:
-

Row to columnar and columnar to row is mostly figured out.  There are some 
performance improvements that we could probably make in the row to columnar 
transition.  The issue is going to be with a columnar to columnar transition.  
Copying data from one columnar format in a performance way is a solvable 
problem, but we might need to special case a few things or do code generation 
if we cannot come up with a good common API.  The issue is going to be with the 
desired batch size.

parquet and orc output a batch size of 4096 rows by default, but each are 
separate configs.
in memory columnar storage wants 1 rows by default, but also has a hard 
coded soft limit of 4MB compressed.
The arrow config though is for a maximum size of 1 rows by default.

So I am thinking that we want `SparkPlan` to optionally specify a maximum batch 
size instead of a target size.  The row to columnar transition would just build 
up a batch until it hits the target size or the end of the input iterator.  The 
columnar to columnar transition is a little more complicated. It would have to 
copy out a range of rows from one batch into another batch.  This could mean in 
the worst case that we have one batch come in, in arrow format, but we need to 
copy it  to another batch, so that we can split it up into the target size. 

This should cover the use case for basic map like UDFs.

For UDFs like `FlatMapCoGroupsInPandasExec` there is no fixed batch size, and 
in fact it takes two iterators as input that are co-grouped together.  If we 
wanted an operator like this to do columnar processing we would have to be able 
to replicate all of that processing, but for columnar Arrow formatted data. 
This is starting to go beyond what I see as the scope of this JIRA and I would 
prefer to stick with just `MapInPandasExec`, `MapPartitionsInRWithArrowExec`, 
and `ArrowEvalPythonExec` for now.  In follow on work we can start to look at 
what it would take to support an ArrowBatchedGroupedIterator, and an 
ArrowBatchedCoGroupedIterator.

> Investigate commonizing Columnar and Row data transformations 
> --
>
> Key: SPARK-32334
> URL: https://issues.apache.org/jira/browse/SPARK-32334
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Thomas Graves
>Priority: Major
>
> We introduced more Columnar Support with SPARK-27396.
> With that we recognized that there is code that is doing very similar 
> transformations from ColumnarBatch or Arrow into InternalRow and vice versa.  
> For instance: 
> [https://github.com/apache/spark/blob/a4382f7fe1c36a51c64f460c6cb91e93470e0825/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L56-L58]
> [https://github.com/apache/spark/blob/a4382f7fe1c36a51c64f460c6cb91e93470e0825/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L389]
> We should investigate if we can commonize that code.
> We are also looking at making the internal caching serialization pluggable to 
> allow for different cache implementations. 
> ([https://github.com/apache/spark/pull/29067]). 
> It was recently brought up that we should investigate if using the data 
> source v2 api makes sense and is feasible for some of these transformations 
> to allow it to be easily extended.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32334) Investigate commonizing Columnar and Row data transformations

2020-07-21 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17162147#comment-17162147
 ] 

Robert Joseph Evans commented on SPARK-32334:
-

I think I can get the conversation started here.

{{SparkPlan}} supports a few APIs for columnar processing right now.  
* {{supportsColumnar}} which returns true if {{executeColumnar}} should be 
called to process columnar data.
* {{vectorTypes}} an optional set of class names for the columnar output of 
this stage which is a performance improvement for the code generation  phase of 
converting the data to rows.
* {{executeColumnar}} the main entry point to columnar execution.
* {{doExecuteColumnar}} what users are expected to implement if 
{{supportsColumnar}} returns true.

When {{supportsColumnar}} returns true it is assumed that both the input and 
the output of the stage will be columnar data. With this information 
{{ApplyColumnarRulesAndInsertTransitions}} will insert {{RowToColumnarExec}} 
and {{ColumnarToRowExec}} transitions.  {{ColumnarToRowExec}} is by far the 
more optimized because it is widely used today.

One of the goals of this issue is to try and make something like 
{{ArrowEvalPythonExec}} be columnar.  If we just set {{executeColumnar}} to 
true for it the incoming data layout would be columnar, but it most likely 
would not be Arrow formatted, so it would still require some kind of transition 
from one columnar format to an Arrow based format.  There is also no guarantee 
that the size of the batch will correspond to what this operator wants. 
{{RowToColumnarExec}} goes off of the 
{{spark.sql.inMemoryColumnarStorage.batchSize}} config, but 
{{ArrowEvalPythonExec}} uses {{spark.sql.execution.arrow.maxRecordsPerBatch}}.

To get around both of these issues I would propose that we let `SparkPlan` 
optionally ask for both a specific type of input and a specific target size. We 
might also want a better way to say what type of output it is going to produce 
so we can optimize away some transitions if they are not needed.



> Investigate commonizing Columnar and Row data transformations 
> --
>
> Key: SPARK-32334
> URL: https://issues.apache.org/jira/browse/SPARK-32334
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Thomas Graves
>Priority: Major
>
> We introduced more Columnar Support with SPARK-27396.
> With that we recognized that there is code that is doing very similar 
> transformations from ColumnarBatch or Arrow into InternalRow and vice versa.  
> For instance: 
> [https://github.com/apache/spark/blob/a4382f7fe1c36a51c64f460c6cb91e93470e0825/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L56-L58]
> [https://github.com/apache/spark/blob/a4382f7fe1c36a51c64f460c6cb91e93470e0825/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L389]
> We should investigate if we can commonize that code.
> We are also looking at making the internal caching serialization pluggable to 
> allow for different cache implementations. 
> ([https://github.com/apache/spark/pull/29067]). 
> It was recently brought up that we should investigate if using the data 
> source v2 api makes sense and is feasible for some of these transformations 
> to allow it to be easily extended.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32274) Add in the ability for a user to replace the serialization format of the cache

2020-07-10 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17155660#comment-17155660
 ] 

Robert Joseph Evans commented on SPARK-32274:
-

I filed [https://github.com/apache/spark/pull/29067] for this.

> Add in the ability for a user to replace the serialization format of the cache
> --
>
> Key: SPARK-32274
> URL: https://issues.apache.org/jira/browse/SPARK-32274
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> Caching a dataset or dataframe can be a very expensive operation, but has a 
> huge benefit for later queries that use it.  There are many use cases that 
> could benefit from caching the data but not enough to justify the current 
> scheme.  I would like to propose that we make the serialization of the 
> caching plugable.  That way users can explore other formats and compression 
> code.
>  
> As an example I took the line item table from TPCH at a scale factor of 10 
> and converted it to parquet.  This resulted in 2.1 GB of data on disk. With 
> the current caching it can take nearly 8 GB to store that same data in 
> memory, and about 5 GB to store in on disk.
>  
> If I want to read all of that data and and write it out again.
> {code:java}
> scala> val a = spark.read.parquet("../data/tpch/SF10_parquet/lineitem.tbl/")
> a: org.apache.spark.sql.DataFrame = [l_orderkey: bigint, l_partkey: bigint 
> ... 14 more fields]
> scala> spark.time(a.write.mode("overwrite").parquet("./target/tmp"))
> Time taken: 25832 ms {code}
> But a query that reads that data directly from the cache after it is built 
> only takes 21531 ms. For some queries having much more data that can be 
> stored in the cache might be worth the extra query time.
>  
> It also takes about a lot less time to do the parquet compression than it 
> does to do the cache compression.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32274) Add in the ability for a user to replace the serialization format of the cache

2020-07-10 Thread Robert Joseph Evans (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-32274:

Description: 
Caching a dataset or dataframe can be a very expensive operation, but has a 
huge benefit for later queries that use it.  There are many use cases that 
could benefit from caching the data but not enough to justify the current 
scheme.  I would like to propose that we make the serialization of the caching 
plugable.  That way users can explore other formats and compression code.

 

As an example I took the line item table from TPCH at a scale factor of 10 and 
converted it to parquet.  This resulted in 2.1 GB of data on disk. With the 
current caching it can take nearly 8 GB to store that same data in memory, and 
about 5 GB to store in on disk.

 

If I want to read all of that data and and write it out again.
{code:java}
scala> val a = spark.read.parquet("../data/tpch/SF10_parquet/lineitem.tbl/")
a: org.apache.spark.sql.DataFrame = [l_orderkey: bigint, l_partkey: bigint ... 
14 more fields]

scala> spark.time(a.write.mode("overwrite").parquet("./target/tmp"))
Time taken: 25832 ms {code}
But a query that reads that data directly from the cache after it is built only 
takes 21531 ms. For some queries having much more data that can be stored in 
the cache might be worth the extra query time.

 

It also takes about a lot less time to do the parquet compression than it does 
to do the cache compression.

  was:
Caching a dataset or dataframe can be a very expensive operation, but has a 
huge benefit for later queries that use it.  There are many use cases that 
could benefit from caching the data but not enough to justify the current 
scheme.  I would like to propose that we make the serialization of the caching 
plugable.  That way users can explore other formats and compression code.

 

As an example I took the line item table from TPCH at a scale factor of 10 and 
converted it to parquet.  This resulted in 2.1 GB of data on disk. With the 
current caching it can take nearly 8 GB to store that same data in memory, and 
about 5 GB to store in on disk.

 

If I want to read all of that data and and write it out again.

```

scala> val a = spark.read.parquet("../data/tpch/SF10_parquet/lineitem.tbl/")

a: org.apache.spark.sql.DataFrame = [l_orderkey: bigint, l_partkey: bigint ... 
14 more fields]

 

scala> spark.time(a.write.mode("overwrite").parquet("./target/tmp"))
Time taken: 25832 ms 

```

But a query that reads that data directly from the cache after it is built only 
takes 21531 ms. For some queries having much more data that can be stored in 
the cache might be worth the extra query time.

 

It also takes about a lot less time to do the parquet compression than it does 
to do the cache compression.


> Add in the ability for a user to replace the serialization format of the cache
> --
>
> Key: SPARK-32274
> URL: https://issues.apache.org/jira/browse/SPARK-32274
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> Caching a dataset or dataframe can be a very expensive operation, but has a 
> huge benefit for later queries that use it.  There are many use cases that 
> could benefit from caching the data but not enough to justify the current 
> scheme.  I would like to propose that we make the serialization of the 
> caching plugable.  That way users can explore other formats and compression 
> code.
>  
> As an example I took the line item table from TPCH at a scale factor of 10 
> and converted it to parquet.  This resulted in 2.1 GB of data on disk. With 
> the current caching it can take nearly 8 GB to store that same data in 
> memory, and about 5 GB to store in on disk.
>  
> If I want to read all of that data and and write it out again.
> {code:java}
> scala> val a = spark.read.parquet("../data/tpch/SF10_parquet/lineitem.tbl/")
> a: org.apache.spark.sql.DataFrame = [l_orderkey: bigint, l_partkey: bigint 
> ... 14 more fields]
> scala> spark.time(a.write.mode("overwrite").parquet("./target/tmp"))
> Time taken: 25832 ms {code}
> But a query that reads that data directly from the cache after it is built 
> only takes 21531 ms. For some queries having much more data that can be 
> stored in the cache might be worth the extra query time.
>  
> It also takes about a lot less time to do the parquet compression than it 
> does to do the cache compression.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32274) Add in the ability for a user to replace the serialization format of the cache

2020-07-10 Thread Robert Joseph Evans (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17155654#comment-17155654
 ] 

Robert Joseph Evans commented on SPARK-32274:
-

If someone could assign this to me that would be great I have a patch I will 
put up shortly.

> Add in the ability for a user to replace the serialization format of the cache
> --
>
> Key: SPARK-32274
> URL: https://issues.apache.org/jira/browse/SPARK-32274
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> Caching a dataset or dataframe can be a very expensive operation, but has a 
> huge benefit for later queries that use it.  There are many use cases that 
> could benefit from caching the data but not enough to justify the current 
> scheme.  I would like to propose that we make the serialization of the 
> caching plugable.  That way users can explore other formats and compression 
> code.
>  
> As an example I took the line item table from TPCH at a scale factor of 10 
> and converted it to parquet.  This resulted in 2.1 GB of data on disk. With 
> the current caching it can take nearly 8 GB to store that same data in 
> memory, and about 5 GB to store in on disk.
>  
> If I want to read all of that data and and write it out again.
> ```
> scala> val a = spark.read.parquet("../data/tpch/SF10_parquet/lineitem.tbl/")
> a: org.apache.spark.sql.DataFrame = [l_orderkey: bigint, l_partkey: bigint 
> ... 14 more fields]
>  
> scala> spark.time(a.write.mode("overwrite").parquet("./target/tmp"))
> Time taken: 25832 ms 
> ```
> But a query that reads that data directly from the cache after it is built 
> only takes 21531 ms. For some queries having much more data that can be 
> stored in the cache might be worth the extra query time.
>  
> It also takes about a lot less time to do the parquet compression than it 
> does to do the cache compression.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32274) Add in the ability for a user to replace the serialization format of the cache

2020-07-10 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-32274:
---

 Summary: Add in the ability for a user to replace the 
serialization format of the cache
 Key: SPARK-32274
 URL: https://issues.apache.org/jira/browse/SPARK-32274
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.0
Reporter: Robert Joseph Evans


Caching a dataset or dataframe can be a very expensive operation, but has a 
huge benefit for later queries that use it.  There are many use cases that 
could benefit from caching the data but not enough to justify the current 
scheme.  I would like to propose that we make the serialization of the caching 
plugable.  That way users can explore other formats and compression code.

 

As an example I took the line item table from TPCH at a scale factor of 10 and 
converted it to parquet.  This resulted in 2.1 GB of data on disk. With the 
current caching it can take nearly 8 GB to store that same data in memory, and 
about 5 GB to store in on disk.

 

If I want to read all of that data and and write it out again.

```

scala> val a = spark.read.parquet("../data/tpch/SF10_parquet/lineitem.tbl/")

a: org.apache.spark.sql.DataFrame = [l_orderkey: bigint, l_partkey: bigint ... 
14 more fields]

 

scala> spark.time(a.write.mode("overwrite").parquet("./target/tmp"))
Time taken: 25832 ms 

```

But a query that reads that data directly from the cache after it is built only 
takes 21531 ms. For some queries having much more data that can be stored in 
the cache might be worth the extra query time.

 

It also takes about a lot less time to do the parquet compression than it does 
to do the cache compression.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32110) -0.0 vs 0.0 is inconsistent

2020-06-26 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-32110:
---

 Summary: -0.0 vs 0.0 is inconsistent
 Key: SPARK-32110
 URL: https://issues.apache.org/jira/browse/SPARK-32110
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Robert Joseph Evans


This is related to SPARK-26021 where some things were fixed but there is still 
a lot that is not consistent.

When parsing SQL {{-0.0}} is turned into {{0.0}}. This can produce quick 
results that appear to be correct but are totally inconsistent for the same 
operators.
{code:java}
scala> import spark.implicits._
import spark.implicits._

scala> spark.sql("SELECT 0.0 = -0.0").collect
res0: Array[org.apache.spark.sql.Row] = Array([true])

scala> Seq((0.0, -0.0)).toDF("a", "b").selectExpr("a = b").collect
res1: Array[org.apache.spark.sql.Row] = Array([false])
{code}
This also shows up in sorts
{code:java}
scala> Seq((0.0, -100.0), (-0.0, 100.0), (0.0, 100.0), (-0.0, 
-100.0)).toDF("a", "b").orderBy("a", "b").collect
res2: Array[org.apache.spark.sql.Row] = Array([-0.0,-100.0], [-0.0,100.0], 
[0.0,-100.0], [0.0,100.0])
{code}
But not for a equi-join or for an aggregate
{code:java}
scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
"r_b"), $"a" === $"r_a").collect
res3: Array[org.apache.spark.sql.Row] = Array([0.0,-0.0,-0.0,0.0])

scala> Seq((0.0, 1.0), (-0.0, 1.0)).toDF("a", "b").groupBy("a").count.collect
res6: Array[org.apache.spark.sql.Row] = Array([0.0,2])
{code}
This can lead to some very odd results. Like an equi-join with a filter that 
logically should do nothing, but ends up filtering the result to nothing.
{code:java}
scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
"r_b"), $"a" === $"r_a" && $"a" <= $"r_a").collect
res8: Array[org.apache.spark.sql.Row] = Array()

scala> Seq((0.0, -0.0)).toDF("a", "b").join(Seq((-0.0, 0.0)).toDF("r_a", 
"r_b"), $"a" === $"r_a").collect
res9: Array[org.apache.spark.sql.Row] = Array([0.0,-0.0,-0.0,0.0])
{code}
Hive never normalizes -0.0 to 0.0 so this results in non-ieee complaint 
behavior everywhere, but at least it is consistently odd.

MySQL, Oracle, Postgres, and SQLite all appear to normalize the {{-0.0}} to 
{{0.0}}.

The root cause of this appears to be that the java implementation of 
{{Double.compare}} and {{Float.compare}} for open JDK places {{-0.0}} < {{0.0}}.

This is not documented in the java docs but it is clearly documented in the 
code, so it is not a "bug" that java is going to fix.

[https://github.com/openjdk/jdk/blob/a0a0539b0d3f9b6809c9759e697bfafd7b138ec1/src/java.base/share/classes/java/lang/Double.java#L1022-L1035]

It is also consistent with what is in the java docs for {{Double.equals}}
 
[https://docs.oracle.com/javase/8/docs/api/java/lang/Double.html#equals-java.lang.Object-]

To be clear I am filing this mostly to document the current state rather than 
to think it needs to be fixed ASAP. It is a rare corner case, but ended up 
being really frustrating for me to debug what was happening.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28774) ReusedExchangeExec cannot be columnar

2019-08-19 Thread Robert Joseph Evans (Jira)
Robert Joseph Evans created SPARK-28774:
---

 Summary: ReusedExchangeExec cannot be columnar
 Key: SPARK-28774
 URL: https://issues.apache.org/jira/browse/SPARK-28774
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Robert Joseph Evans


If a ShuffleExchangeExec is replaced with a columnar version and deduped to a 
ReusedExchangeExec it will fail because ReusedExchangeExec does not implement 
any of the columnar APIs.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28213) Remove duplication between columnar and ColumnarBatchScan

2019-06-28 Thread Robert Joseph Evans (JIRA)
Robert Joseph Evans created SPARK-28213:
---

 Summary: Remove duplication between columnar and ColumnarBatchScan
 Key: SPARK-28213
 URL: https://issues.apache.org/jira/browse/SPARK-28213
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Robert Joseph Evans


There is a lot of duplicate code between Columanr.scala and ColumanrBatchScan.  
This should fix that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-06-04 Thread Robert Joseph Evans (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-27396:

Epic Name: Public APIs for extended Columnar Processing Support

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Epic
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *SPIP: Columnar Processing Without Arrow Formatting Guarantees.*
>  
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to
>  # Add to the current sql extensions mechanism so advanced users can have 
> access to the physical SparkPlan and manipulate it to provide columnar 
> processing for existing operators, including shuffle.  This will allow them 
> to implement their own cost based optimizers to decide when processing should 
> be columnar and when it should not.
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the users so operations that are not columnar see the 
> data as rows, and operations that are columnar see the data as columns.
>  
> Not Requirements, but things that would be nice to have.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  
> *Q2.* What problem is this proposal NOT designed to solve? 
> The goal of this is not for ML/AI but to provide APIs for accelerated 
> computing in Spark primarily targeting SQL/ETL like workloads.  ML/AI already 
> have several mechanisms to get data into/out of them. These can be improved 
> but will be covered in a separate SPIP.
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation.
> This does not cover exposing the underlying format of the data.  The only way 
> to get at the data in a ColumnVector is through the public APIs.  Exposing 
> the underlying format to improve efficiency will be covered in a separate 
> SPIP.
> This is not trying to implement new ways of transferring data to external 
> ML/AI applications.  That is covered by separate SPIPs already.
> This is not trying to add in generic code generation for columnar processing. 
>  Currently code generation for columnar processing is only supported when 
> translating columns to rows.  We will continue to support this, but will not 
> extend it as a general solution. That will be covered in a separate SPIP if 
> we find it is helpful.  For now columnar processing will be interpreted.
> This is not trying to expose a way to get columnar data into Spark through 
> DataSource V2 or any other similar API.  That would be covered by a separate 
> SPIP if we find it is needed.
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Internal implementations of FileFormats, optionally can return a 
> ColumnarBatch instead of rows.  The code generation phase knows how to take 
> that columnar data and iterate through it as rows for stages that wants rows, 
> which currently is almost everything.  The limitations here are mostly 
> implementation specific. The current standard is to abuse Scala’s type 
> erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The 
> code generation can handle this because it is generating java code, so it 
> bypasses scala’s type checking and just casts the InternalRow to the desired 
> ColumnarBatch.  This makes it difficult for others to implement the same 
> functionality for different processing because they can only do it through 
> code generation. There really is no clean separate path in the code 
> generation for columnar vs row based. Additionally, because it is only 
> supported through code generation if for any reason code generation would 
> fail there is no backup.  This is typically fine for input formats but can be 
> problematic when we get into more extensive processing.
>  # When caching data it can optionally be cached in a columnar format if the 
> input is also columnar.  This is similar to the first area and has the same 
> limitations because the 

[jira] [Created] (SPARK-27945) Make minimal changes to support columnar processing

2019-06-04 Thread Robert Joseph Evans (JIRA)
Robert Joseph Evans created SPARK-27945:
---

 Summary: Make minimal changes to support columnar processing
 Key: SPARK-27945
 URL: https://issues.apache.org/jira/browse/SPARK-27945
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Robert Joseph Evans


As the first step for SPARK-27396 this is to put in the minimum changes needed 
to allow a plugin to support columnar processing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-06-04 Thread Robert Joseph Evans (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-27396:

Issue Type: Epic  (was: Improvement)

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Epic
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *SPIP: Columnar Processing Without Arrow Formatting Guarantees.*
>  
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to
>  # Add to the current sql extensions mechanism so advanced users can have 
> access to the physical SparkPlan and manipulate it to provide columnar 
> processing for existing operators, including shuffle.  This will allow them 
> to implement their own cost based optimizers to decide when processing should 
> be columnar and when it should not.
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the users so operations that are not columnar see the 
> data as rows, and operations that are columnar see the data as columns.
>  
> Not Requirements, but things that would be nice to have.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  
> *Q2.* What problem is this proposal NOT designed to solve? 
> The goal of this is not for ML/AI but to provide APIs for accelerated 
> computing in Spark primarily targeting SQL/ETL like workloads.  ML/AI already 
> have several mechanisms to get data into/out of them. These can be improved 
> but will be covered in a separate SPIP.
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation.
> This does not cover exposing the underlying format of the data.  The only way 
> to get at the data in a ColumnVector is through the public APIs.  Exposing 
> the underlying format to improve efficiency will be covered in a separate 
> SPIP.
> This is not trying to implement new ways of transferring data to external 
> ML/AI applications.  That is covered by separate SPIPs already.
> This is not trying to add in generic code generation for columnar processing. 
>  Currently code generation for columnar processing is only supported when 
> translating columns to rows.  We will continue to support this, but will not 
> extend it as a general solution. That will be covered in a separate SPIP if 
> we find it is helpful.  For now columnar processing will be interpreted.
> This is not trying to expose a way to get columnar data into Spark through 
> DataSource V2 or any other similar API.  That would be covered by a separate 
> SPIP if we find it is needed.
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Internal implementations of FileFormats, optionally can return a 
> ColumnarBatch instead of rows.  The code generation phase knows how to take 
> that columnar data and iterate through it as rows for stages that wants rows, 
> which currently is almost everything.  The limitations here are mostly 
> implementation specific. The current standard is to abuse Scala’s type 
> erasure to return ColumnarBatches as the elements of an RDD[InternalRow]. The 
> code generation can handle this because it is generating java code, so it 
> bypasses scala’s type checking and just casts the InternalRow to the desired 
> ColumnarBatch.  This makes it difficult for others to implement the same 
> functionality for different processing because they can only do it through 
> code generation. There really is no clean separate path in the code 
> generation for columnar vs row based. Additionally, because it is only 
> supported through code generation if for any reason code generation would 
> fail there is no backup.  This is typically fine for input formats but can be 
> problematic when we get into more extensive processing.
>  # When caching data it can optionally be cached in a columnar format if the 
> input is also columnar.  This is similar to the first area and has the same 
> limitations because the cache acts as an input, but 

[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-05-03 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16832776#comment-16832776
 ] 

Robert Joseph Evans commented on SPARK-27396:
-

[~bryanc]

The nice to have arrow formatting is for internal use only.  Any user that 
wants to use a generic ColumnVector will have to use the java APIs to pull out 
the data and convert it to whatever format that they expect, like what is 
already done today to go to Pandas UDFs.  Eventually I would like to expose 
that, but we cannot really do that until we get some kind of guarantee on the 
stability of the arrow memory layout.  This would just be a performance boost 
for Pandas or anything else that uses arrow formatted data internally.  This is 
why it is just a nice to have.

There are already multiple different batch size configs in play here.  When the 
data is first read in the batch size is set by a config for parquet or orc 
based off of the number of rows.  When the data is cached, there is a separate 
batch size config for that.  When converting from rows to columns for pandas 
there is spark.sql.execution.arrow.maxRecordsPerBatch.  So initially I was 
going to try to honor these as much as possible for backwards compatibility. 
For the translation from rows to columns, out of the box this would just be for 
pandas.  So I would use that config.  I will likely create a new config that is 
more generic for all of these use cases and have the code fall back to the old 
configs if it is not set.

One of the hard parts is sizing the batches properly.  If they are too big you 
run out of memory and you just cannot run at all, which is why I don't see them 
being one batch per partition. If they are too small you could lose of the 
advantages of columnar processing.  Also the ideal size will depend on the 
accelerator you are using. CPU based SIMD will want everything to fit in cache. 
 Anything doing a DMA transfer will typically want as large of a size as 
possible without running out of memory, and that is likely to be memory of the 
device not on the host.  So a single generic config for now feels like the 
preferable option until we can get more experience to see what else can be done.

Also be aware that each time we do an exchange the data is partitioned and many 
very small batches may come back out.  In the Facebook talk on their exchange 
work at Spark+AI Summit 2019 they said that the average compressed exchange 
size was a few hundred KB, and that would be for the entire partition.  As such 
anyone implementing a partitioning for an exchange on the receiving end 
probably also wants to concatenate the small batches into something that will 
fit the size that they want.  But that is something for the extensions to 
handle, at least initially.

 

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *SPIP: Columnar Processing Without Arrow Formatting Guarantees.*
>  
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to
>  # Add to the current sql extensions mechanism so advanced users can have 
> access to the physical SparkPlan and manipulate it to provide columnar 
> processing for existing operators, including shuffle.  This will allow them 
> to implement their own cost based optimizers to decide when processing should 
> be columnar and when it should not.
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the users so operations that are not columnar see the 
> data as rows, and operations that are columnar see the data as columns.
>  
> Not Requirements, but things that would be nice to have.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  
> *Q2.* What problem is this proposal NOT designed to solve? 
> The goal of this is not for ML/AI but to provide APIs for accelerated 
> computing in Spark primarily targeting SQL/ETL like workloads.  ML/AI already 
> have several mechanisms to get data into/out of them. These can be improved 
> but 

[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-29 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16829631#comment-16829631
 ] 

Robert Joseph Evans commented on SPARK-27396:
-

I have updated this SPIP to clarify some things and to reduce the scope of it.

We are no longer trying to tackle anything around data movement to/from AI/ML 
systems.

We are no longer trying to expose any Arrow APIs or formatting to end users.  
To avoid any stability issues with either the Arrow API or the Arrow memory 
layout specification we are going to split that part off as a separate SPIP 
that can be added in later.  We will work with the Arrow community to see what 
if any stability guarantees we can get before putting that SPIP forward.

The only APIs that we are going to expose in this first SPIP is through the 
spark sql extensions config/API so that groups that want to do accelerated 
columnar ETL can have an API to do it, even if it is an unstable API.

 

Please take another look, and if there is not much in the way of comments we 
will put it up for another vote.

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *SPIP: Columnar Processing Without Arrow Formatting Guarantees.*
>  
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to
>  # Add to the current sql extensions mechanism so advanced users can have 
> access to the physical SparkPlan and manipulate it to provide columnar 
> processing for existing operators, including shuffle.  This will allow them 
> to implement their own cost based optimizers to decide when processing should 
> be columnar and when it should not.
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the users so operations that are not columnar see the 
> data as rows, and operations that are columnar see the data as columns.
>  
> Not Requirements, but things that would be nice to have.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  
> *Q2.* What problem is this proposal NOT designed to solve? 
> The goal of this is not for ML/AI but to provide APIs for accelerated 
> computing in Spark primarily targeting SQL/ETL like workloads.  ML/AI already 
> have several mechanisms to get data into/out of them. These can be improved 
> but will be covered in a separate SPIP.
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation.
> This does not cover exposing the underlying format of the data.  The only way 
> to get at the data in a ColumnVector is through the public APIs.  Exposing 
> the underlying format to improve efficiency will be covered in a separate 
> SPIP.
> This is not trying to implement new ways of transferring data to external 
> ML/AI applications.  That is covered by separate SPIPs already.
> This is not trying to add in generic code generation for columnar processing. 
>  Currently code generation for columnar processing is only supported when 
> translating columns to rows.  We will continue to support this, but will not 
> extend it as a general solution. That will be covered in a separate SPIP if 
> we find it is helpful.  For now columnar processing will be interpreted.
> This is not trying to expose a way to get columnar data into Spark through 
> DataSource V2 or any other similar API.  That would be covered by a separate 
> SPIP if we find it is needed.
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Internal implementations of FileFormats, optionally can return a 
> ColumnarBatch instead of rows.  The code generation phase knows how to take 
> that columnar data and iterate through it as rows for stages that wants rows, 
> which currently is almost everything.  The limitations here are mostly 
> implementation specific. The current standard is to abuse Scala’s type 
> erasure to return ColumnarBatches as the elements of an 

[jira] [Updated] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-29 Thread Robert Joseph Evans (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-27396:

Description: 
*SPIP: Columnar Processing Without Arrow Formatting Guarantees.*

 

*Q1.* What are you trying to do? Articulate your objectives using absolutely no 
jargon.

The Dataset/DataFrame API in Spark currently only exposes to users one row at a 
time when processing data.  The goals of this are to
 # Add to the current sql extensions mechanism so advanced users can have 
access to the physical SparkPlan and manipulate it to provide columnar 
processing for existing operators, including shuffle.  This will allow them to 
implement their own cost based optimizers to decide when processing should be 
columnar and when it should not.
 # Make any transitions between the columnar memory layout and a row based 
layout transparent to the users so operations that are not columnar see the 
data as rows, and operations that are columnar see the data as columns.

 

Not Requirements, but things that would be nice to have.
 # Transition the existing in memory columnar layouts to be compatible with 
Apache Arrow.  This would make the transformations to Apache Arrow format a 
no-op. The existing formats are already very close to those layouts in many 
cases.  This would not be using the Apache Arrow java library, but instead 
being compatible with the memory 
[layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
subset of that layout.

 

*Q2.* What problem is this proposal NOT designed to solve? 

The goal of this is not for ML/AI but to provide APIs for accelerated computing 
in Spark primarily targeting SQL/ETL like workloads.  ML/AI already have 
several mechanisms to get data into/out of them. These can be improved but will 
be covered in a separate SPIP.

This is not trying to implement any of the processing itself in a columnar way, 
with the exception of examples for documentation.

This does not cover exposing the underlying format of the data.  The only way 
to get at the data in a ColumnVector is through the public APIs.  Exposing the 
underlying format to improve efficiency will be covered in a separate SPIP.

This is not trying to implement new ways of transferring data to external ML/AI 
applications.  That is covered by separate SPIPs already.

This is not trying to add in generic code generation for columnar processing.  
Currently code generation for columnar processing is only supported when 
translating columns to rows.  We will continue to support this, but will not 
extend it as a general solution. That will be covered in a separate SPIP if we 
find it is helpful.  For now columnar processing will be interpreted.

This is not trying to expose a way to get columnar data into Spark through 
DataSource V2 or any other similar API.  That would be covered by a separate 
SPIP if we find it is needed.

 

*Q3.* How is it done today, and what are the limits of current practice?

The current columnar support is limited to 3 areas.
 # Internal implementations of FileFormats, optionally can return a 
ColumnarBatch instead of rows.  The code generation phase knows how to take 
that columnar data and iterate through it as rows for stages that wants rows, 
which currently is almost everything.  The limitations here are mostly 
implementation specific. The current standard is to abuse Scala’s type erasure 
to return ColumnarBatches as the elements of an RDD[InternalRow]. The code 
generation can handle this because it is generating java code, so it bypasses 
scala’s type checking and just casts the InternalRow to the desired 
ColumnarBatch.  This makes it difficult for others to implement the same 
functionality for different processing because they can only do it through code 
generation. There really is no clean separate path in the code generation for 
columnar vs row based. Additionally, because it is only supported through code 
generation if for any reason code generation would fail there is no backup.  
This is typically fine for input formats but can be problematic when we get 
into more extensive processing.
 # When caching data it can optionally be cached in a columnar format if the 
input is also columnar.  This is similar to the first area and has the same 
limitations because the cache acts as an input, but it is the only piece of 
code that also consumes columnar data as an input.
 # Pandas vectorized processing.  To be able to support Pandas UDFs Spark will 
build up a batch of data and send it to python for processing, and then get a 
batch of data back as a result.  The format of the data being sent to python 
can either be pickle, which is the default, or optionally Arrow. The result 
returned is the same format. The limitations here really are around 
performance.  Transforming the data back and forth can be very expensive.

 

*Q4.* What is new in 

[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-20 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16822411#comment-16822411
 ] 

Robert Joseph Evans commented on SPARK-27396:
-

[~mengxr],

My goal is to provide a framework for columnar processing, just like the SPIP 
says. SPARK-24579 is for data exchange. If you have issues with exposing the 
Arrow Java APIs then mention it on that JIRA.

My primary goal is actually not DL/ML but ETL.  Columnar happens to be a common 
thread between them because it has traditionally been the preferred memory 
layout for doing accelerated numerical computation.

Python already supports a public columnar API.  These are the pandas UDFs.  The 
issue is the overhead of doing a data exchange.  And you are right about the 
data pipelineing in the GPU model, which is why ultimetly you will need some 
form of a cost based optimizer to determine if the operations within a stage 
are enough to justify the data movement and or data translatations involved.  
But there are more choices for accelrated computing besides just GPUs, which is 
why besides adding in the framework we are also allowing extensions to override 
the coputation model so anyone can provide a new backend to Spark.

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure the APIs work, and without this feature it makes 
> that impossible.
>  
> Not Requirements, but things that would be nice to have.
>  # Provide default implementations for partitioning columnar data, so users 
> don’t have to.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  # Provide a clean transition from the existing code to the new one.  The 
> existing APIs which are public but evolving are not that far off from what is 
> being proposed.  We should be able to create a new parallel API that can wrap 
> the existing one. This means any file format that is trying to support 
> columnar can still do so until we make a conscious decision to deprecate and 
> then turn off the old APIs.
>  
> *Q2.* What problem is this proposal NOT designed to solve?
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation, and possibly default 
> implementations for partitioning of columnar shuffle. 
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
> code generation phase knows how to take that columnar data and iterate 
> through it as rows for stages that wants rows, which currently is almost 
> everything.  The limitations here are mostly implementation 

[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-16 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819592#comment-16819592
 ] 

Robert Joseph Evans commented on SPARK-27396:
-

This SPIP is to put a framework in place to be able to support columnar 
processing, but not to actually implement that processing. Implementations 
would be provided by extensions.  Those extensions would have a few options on 
how to allocate memory for results and/or intermediate results.  The contract 
really is just around the ColumnarBatch and ColumnVector classes, so an 
extension could use built in implementations, similar to the on heap, off heap, 
and arrow column vector implementations in the current vesion of Spark.  We 
would provide a config for what the default should be and an API to be able to 
allocate one of these vectors based off of that config, but in some cases the 
extension may want to supply their own implementation, similar to how the ORC 
FileFormat currently does.

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure the APIs work, and without this feature it makes 
> that impossible.
>  
> Not Requirements, but things that would be nice to have.
>  # Provide default implementations for partitioning columnar data, so users 
> don’t have to.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  # Provide a clean transition from the existing code to the new one.  The 
> existing APIs which are public but evolving are not that far off from what is 
> being proposed.  We should be able to create a new parallel API that can wrap 
> the existing one. This means any file format that is trying to support 
> columnar can still do so until we make a conscious decision to deprecate and 
> then turn off the old APIs.
>  
> *Q2.* What problem is this proposal NOT designed to solve?
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation, and possibly default 
> implementations for partitioning of columnar shuffle. 
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
> code generation phase knows how to take that columnar data and iterate 
> through it as rows for stages that wants rows, which currently is almost 
> everything.  The limitations here are mostly implementation specific. The 
> current standard is to abuse Scala’s type erasure to return ColumnarBatches 
> as the elements of an RDD[InternalRow]. The code generation can handle this 
> because it is generating java code, so it bypasses scala’s type checking 

[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-16 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16819125#comment-16819125
 ] 

Robert Joseph Evans commented on SPARK-27396:
-

[~bryanc],

I see your point that if this is for data exchange the API should be 
{{RDD[ArrowRecordBatch]}} and {{arrow_udf}}.  {{ArrowRecordBatch}} is an ipc 
class and is made for exchanging data so it should work out of the box and be 
simper for end users to deal with.  If it is for doing in place data 
processing, not sending it to another system, then I think we want something 
based off of {{ColumnarBatch}}.  Since in place columnar data processing is 
hard, initially limiting it to just the extensions API feels preferable.  If 
others are okay with that I will drop {{columnar_udf}} and 
{{RDD[ColumnarBatch]}} from this proposal, and just make sure that we have a 
good way for translating between {{ColumnarBatch}} and {{ArrowRecordBatch}} so 
we can play nicely with SPARK-24579.  In the future if we find that advanced 
users do want columnar processing UDFs we can discuss ways to properly expose 
it at that point.

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure the APIs work, and without this feature it makes 
> that impossible.
>  
> Not Requirements, but things that would be nice to have.
>  # Provide default implementations for partitioning columnar data, so users 
> don’t have to.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  # Provide a clean transition from the existing code to the new one.  The 
> existing APIs which are public but evolving are not that far off from what is 
> being proposed.  We should be able to create a new parallel API that can wrap 
> the existing one. This means any file format that is trying to support 
> columnar can still do so until we make a conscious decision to deprecate and 
> then turn off the old APIs.
>  
> *Q2.* What problem is this proposal NOT designed to solve?
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation, and possibly default 
> implementations for partitioning of columnar shuffle. 
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
> code generation phase knows how to take that columnar data and iterate 
> through it as rows for stages that wants rows, which currently is almost 
> everything.  The limitations here are mostly implementation specific. The 
> current standard is to abuse Scala’s type erasure to return ColumnarBatches 
> as the elements of 

[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-15 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817990#comment-16817990
 ] 

Robert Joseph Evans commented on SPARK-27396:
-

There are actually a few public facing APIs I think we would need to expose 
corresponding to different use cases.

 

The ML use case needs access to the data for easy transfer, but also needs to 
know when it has all of the data it is expecting. Typically this is done by 
converting the dataset into an {{RDD}} which can expose an iterator so you know 
you have all of the data. The simplest, and probably the most memory efficient 
solution is to expose the underlying {{RDD[ColumanrBatch]}}. We could do this 
with a new API on dataset similar to {{rdd()}} or {{javaRdd()}} perhaps 
something called {{columnarRdd()}} or {{columnarJavaRdd()}}, but I would need 
to think about the exact name a bit. This is the use cases that really drives 
me wanting to have a columnar shuffle implementation on the CPU because the 
typical use case is to load the data from a file, and then repartition the data 
for training, before doing the training. Many of the input formats (parquet, 
and orc) already are columnar, so we could support that entire use case without 
transforming the data from columns to rows.

An RDD based API, however, is far from ideal for data processing, as we would 
lose all of the advantages of the DataSet for query optimization and rewriting. 
For that use case I think providing a columnar UDF API, based off of the pandas 
udf APIs. At a minimum I would say that we start off with a scalar columnar udf 
that would look like a regular udf, but the input instead of being simple types 
would be {{ColumnVector}} and so would the output. But because a ColumnVector 
is not typed itself we could not infer the output format from the API, so we 
would need them to declare it for us, something kind of like.
{code:java}
def columnar_udf((a) -> /* do something with a*/, BooleanType)
{code}
In the future we could add in support for the more complex forms if they prove 
popular like grouped map and grouped aggregate udfs. These should be fairly 
simple to do, but I don't really want to commit to them in the first version of 
the code, but we would want the API written with that in mind.

The final API is to allow advanced framework users to add or replace the 
columnar processing of a physical plan. This would allow someone to add in say 
GPU support as the backend for data processing, or SIMD optimized CPU, or any 
other type of accelerated columnar processing. This would be done through the 
experimental and unstable `spark.sql.extensions` config and the 
SparkSessionsExtensions API. It would primarily provide some places to insert a 
`Rule[SparkPlan]` into the execution phase of Catalyst so framework writers 
could use the APIs I described previously to implement columnar versions of the 
various operators.  This would also allow the operators to implement columnar 
specific logical plan optimizations using the same API.

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure 

[jira] [Commented] (SPARK-26413) SPIP: RDD Arrow Support in Spark Core and PySpark

2019-04-13 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817081#comment-16817081
 ] 

Robert Joseph Evans commented on SPARK-26413:
-

SPARK-27396 covers this, but with a slightly different approach.

> SPIP: RDD Arrow Support in Spark Core and PySpark
> -
>
> Key: SPARK-26413
> URL: https://issues.apache.org/jira/browse/SPARK-26413
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, Spark Core
>Affects Versions: 3.0.0
>Reporter: Richard Whitcomb
>Priority: Minor
>
> h2. Background and Motivation
> Arrow is becoming an standard interchange format for columnar Structured 
> Data.  This is already true in Spark with the use of arrow in the pandas udf 
> functions in the dataframe API.
> However the current implementation of arrow in spark is limited to two use 
> cases.
>  * Pandas UDF that allows for operations on one or more columns in the 
> DataFrame API.
>  * Collect as Pandas which pulls back the entire dataset to the driver in a 
> Pandas Dataframe.
> What is still hard however is making use of all of the columns in a Dataframe 
> while staying distributed across the workers.  The only way to do this 
> currently is to drop down into RDDs and collect the rows into a dataframe. 
> However pickling is very slow and the collecting is expensive.
> The proposal is to extend spark in a way that allows users to operate on an 
> Arrow Table fully while still making use of Spark's underlying technology.  
> Some examples of possibilities with this new API. 
>  * Pass the Arrow Table with Zero Copy to PyTorch for predictions.
>  * Pass to Nvidia Rapids for an algorithm to be run on the GPU.
>  * Distribute data across many GPUs making use of the new Barriers API.
> h2. Targets users and personas
> ML, Data Scientists, and future library authors..
> h2. Goals
>  * Conversion from any Dataset[Row] or PySpark Dataframe to RDD[Table]
>  * Conversion back from any RDD[Table] to Dataset[Row], RDD[Row], Pyspark 
> Dataframe
>  * Open the possibilities to tighter integration between Arrow/Pandas/Spark 
> especially at a library level.
> h2. Non-Goals
>  * Not creating a new API but instead using existing APIs.
> h2. Proposed API changes
> h3. Data Objects
> case class ArrowTable(schema: Schema, batches: Iterable[ArrowRecordBatch])
> h3. Dataset.scala
> {code:java}
> // Converts a Dataset to an RDD of Arrow Tables
> // Each RDD row is an Interable of Arrow Batches.
> def arrowRDD: RDD[ArrowTable]
>  
> // Utility Function to convert to RDD Arrow Table for PySpark
> private[sql] def javaToPythonArrow: JavaRDD[Array[Byte]]
> {code}
> h3. RDD.scala
> {code:java}
>  // Converts RDD[ArrowTable] to an Dataframe by inspecting the Arrow Schema
>  def arrowToDataframe(implicit ev: T <:< ArrowTable): Dataframe
>   
>  // Converts RDD[ArrowTable] to an RDD of Rows
>  def arrowToRDD(implicit ev: T <:< ArrowTable): RDD[Row]{code}
> h3. Serializers.py
> {code:java}
> # Serializer that takes a Serialized Arrow Tables and returns a pyarrow Table.
> class ArrowSerializer(FramedSerializer)
> {code}
> h3. RDD.py
> {code}
> # New RDD Class that has an RDD[ArrowTable] behind it and uses the new 
> ArrowSerializer instead of the normal Pickle Serializer
> class ArrowRDD(RDD){code}
>  
> h3. Dataframe.py
> {code}
> // New Function that converts a pyspark dataframe into an ArrowRDD
> def arrow(self):
> {code}
>  
> h2. Example API Usage
> h3. Pyspark
> {code}
> # Select a Single Column Using Pandas
> def map_table(arrow_table):
>   import pyarrow as pa
>   pdf = arrow_table.to_pandas()
>   pdf = pdf[['email']]
>   return pa.Table.from_pandas(pdf)
> # Convert to Arrow RDD, map over tables, convert back to dataframe
> df.arrow.map(map_table).dataframe 
> {code}
> h3. Scala
>  
> {code:java}
> // Find N Centroids using Cuda Rapids kMeans
> def runCuKmeans(table: ArrowTable, clusters: Int): ArrowTable
>  
> // Convert Dataset[Row] to RDD[ArrowTable] and back to Dataset[Row]
> df.arrowRDD.map(table => runCuKmeans(table, N)).arrowToDataframe.show(10)
> {code}
>  
> h2. Implementation Details
> As mentioned in the first section, the goal is to make it easier for Spark 
> users to interact with Arrow tools and libraries.  This however does come 
> with some considerations from a Spark perspective.
>  Arrow is column based instead of Row based.  In the above API proposal of 
> RDD[ArrowTable] each RDD row will in fact be a block of data.  Another 
> proposal in this regard is to introduce a new parameter to Spark called 
> arrow.sql.execution.arrow.maxRecordsPerTable.  The goal of this parameter is 
> to decide how many records are included in a single Arrow Table.  If set to 
> -1 the entire partition will be included in the table else to that number. 
> Within that number the normal 

[jira] [Commented] (SPARK-24579) SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks

2019-04-13 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16817080#comment-16817080
 ] 

Robert Joseph Evans commented on SPARK-24579:
-

This SPIP SPARK-27396 covers a superset of the functionality described here.

> SPIP: Standardize Optimized Data Exchange between Spark and DL/AI frameworks
> 
>
> Key: SPARK-24579
> URL: https://issues.apache.org/jira/browse/SPARK-24579
> Project: Spark
>  Issue Type: Epic
>  Components: ML, PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Major
>  Labels: Hydrogen
> Attachments: [SPARK-24579] SPIP_ Standardize Optimized Data Exchange 
> between Apache Spark and DL%2FAI Frameworks .pdf
>
>
> (see attached SPIP pdf for more details)
> At the crossroads of big data and AI, we see both the success of Apache Spark 
> as a unified
> analytics engine and the rise of AI frameworks like TensorFlow and Apache 
> MXNet (incubating).
> Both big data and AI are indispensable components to drive business 
> innovation and there have
> been multiple attempts from both communities to bring them together.
> We saw efforts from AI community to implement data solutions for AI 
> frameworks like tf.data and tf.Transform. However, with 50+ data sources and 
> built-in SQL, DataFrames, and Streaming features, Spark remains the community 
> choice for big data. This is why we saw many efforts to integrate DL/AI 
> frameworks with Spark to leverage its power, for example, TFRecords data 
> source for Spark, TensorFlowOnSpark, TensorFrames, etc. As part of Project 
> Hydrogen, this SPIP takes a different angle at Spark + AI unification.
> None of the integrations are possible without exchanging data between Spark 
> and external DL/AI frameworks. And the performance matters. However, there 
> doesn’t exist a standard way to exchange data and hence implementation and 
> performance optimization fall into pieces. For example, TensorFlowOnSpark 
> uses Hadoop InputFormat/OutputFormat for TensorFlow’s TFRecords to load and 
> save data and pass the RDD records to TensorFlow in Python. And TensorFrames 
> converts Spark DataFrames Rows to/from TensorFlow Tensors using TensorFlow’s 
> Java API. How can we reduce the complexity?
> The proposal here is to standardize the data exchange interface (or format) 
> between Spark and DL/AI frameworks and optimize data conversion from/to this 
> interface.  So DL/AI frameworks can leverage Spark to load data virtually 
> from anywhere without spending extra effort building complex data solutions, 
> like reading features from a production data warehouse or streaming model 
> inference. Spark users can use DL/AI frameworks without learning specific 
> data APIs implemented there. And developers from both sides can work on 
> performance optimizations independently given the interface itself doesn’t 
> introduce big overhead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-11 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815846#comment-16815846
 ] 

Robert Joseph Evans commented on SPARK-27396:
-

[~kiszk],

 

The exact detail of some of the APIs are going to be worked out once I better 
understand some of the internal details of how code generation works, and how 
we are going to be able to support frameworks inserting columnar execution into 
the physical plan.  I am looking at these right now, but the high level APIs 
should remain more or less the same.

 

In general the APIs should look a lot like the existing columnar APIs.

 

There will be a ColumnarBatch and ColumnVectors, but new versions in a new 
package to allow for backwards compatibility with a clean transition. Also 
because Expression will need access to these classes and the current classes 
are in the wrong jar for that to work.  We will likely clean up the APIs some 
to make it cleaner as a more formal public API.  We would formalize an Arrow 
formatted ColumnVector implementation for more general use and provide APIs to 
translate the existing columnar data into arrow formatted data.  If we do our 
job right those translations will be noops in many cases because the data will 
be Arrow formatted already.

 

Catalyst Expression and SparkPlan will have a few new methods added to them 
that should follow the existing form of evaluation, etc.

 
{code}
def supportsColumnar: Boolean = false

 

// For Expression

def evalColumnar(batch: ColumnarBatch): Any = {

  // Throw exception not supported

}

 

def genColumnarCode(ctx: CodegenContext): ExprCode = {

...

}

 

// For SparkPlan

def executeColumnar() : RDD[ColumnarBatch] = {

 ...

}

 

// For CodegenSupport

doColumnarProduce...

doColumnarConsume...

 {code}
 

We would then be able to look at the SparkPlan and Expressions similar to what 
is done for WholeStageCodeGen and insert transitions between columnar and row 
based data formats and execution paths as needed.

 

I hope that gives you enough detail.

 

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure the APIs work, and without this feature it makes 
> that impossible.
>  
> Not Requirements, but things that would be nice to have.
>  # Provide default implementations for partitioning columnar data, so users 
> don’t have to.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  # Provide a clean transition from the existing code to the new one.  The 
> existing APIs which are public but evolving are not that far off from what is 
> being proposed.  We should be able to create a new parallel API that can wrap 
> the existing one. This means any file format that is trying 

[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-11 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16815426#comment-16815426
 ] 

Robert Joseph Evans commented on SPARK-27396:
-

Thanks [~tgraves] I updated the JIRA with you as the shepard.  Does anyone have 
any thing more to discuss or can we call a vote on it?

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure the APIs work, and without this feature it makes 
> that impossible.
>  
> Not Requirements, but things that would be nice to have.
>  # Provide default implementations for partitioning columnar data, so users 
> don’t have to.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  # Provide a clean transition from the existing code to the new one.  The 
> existing APIs which are public but evolving are not that far off from what is 
> being proposed.  We should be able to create a new parallel API that can wrap 
> the existing one. This means any file format that is trying to support 
> columnar can still do so until we make a conscious decision to deprecate and 
> then turn off the old APIs.
>  
> *Q2.* What problem is this proposal NOT designed to solve?
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation, and possibly default 
> implementations for partitioning of columnar shuffle. 
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
> code generation phase knows how to take that columnar data and iterate 
> through it as rows for stages that wants rows, which currently is almost 
> everything.  The limitations here are mostly implementation specific. The 
> current standard is to abuse Scala’s type erasure to return ColumnarBatches 
> as the elements of an RDD[InternalRow]. The code generation can handle this 
> because it is generating java code, so it bypasses scala’s type checking and 
> just casts the InternalRow to the desired ColumnarBatch.  This makes it 
> difficult for others to implement the same functionality for different 
> processing because they can only do it through code generation. There really 
> is no clean separate path in the code generation for columnar vs row based. 
> Additionally because it is only supported through code generation if for any 
> reason code generation would fail there is no backup.  This is typically fine 
> for input formats but can be problematic when we get into more extensive 
> processing. 
>  # When caching data it can optionally be cached in a columnar format if the 
> input is also 

[jira] [Updated] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-11 Thread Robert Joseph Evans (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Joseph Evans updated SPARK-27396:

Shepherd: Thomas Graves

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure the APIs work, and without this feature it makes 
> that impossible.
>  
> Not Requirements, but things that would be nice to have.
>  # Provide default implementations for partitioning columnar data, so users 
> don’t have to.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  # Provide a clean transition from the existing code to the new one.  The 
> existing APIs which are public but evolving are not that far off from what is 
> being proposed.  We should be able to create a new parallel API that can wrap 
> the existing one. This means any file format that is trying to support 
> columnar can still do so until we make a conscious decision to deprecate and 
> then turn off the old APIs.
>  
> *Q2.* What problem is this proposal NOT designed to solve?
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation, and possibly default 
> implementations for partitioning of columnar shuffle. 
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
> code generation phase knows how to take that columnar data and iterate 
> through it as rows for stages that wants rows, which currently is almost 
> everything.  The limitations here are mostly implementation specific. The 
> current standard is to abuse Scala’s type erasure to return ColumnarBatches 
> as the elements of an RDD[InternalRow]. The code generation can handle this 
> because it is generating java code, so it bypasses scala’s type checking and 
> just casts the InternalRow to the desired ColumnarBatch.  This makes it 
> difficult for others to implement the same functionality for different 
> processing because they can only do it through code generation. There really 
> is no clean separate path in the code generation for columnar vs row based. 
> Additionally because it is only supported through code generation if for any 
> reason code generation would fail there is no backup.  This is typically fine 
> for input formats but can be problematic when we get into more extensive 
> processing. 
>  # When caching data it can optionally be cached in a columnar format if the 
> input is also columnar.  This is similar to the first area and has the same 
> limitations because the cache acts as an input, but it is the only piece of 
> code that 

[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-10 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814883#comment-16814883
 ] 

Robert Joseph Evans commented on SPARK-27396:
-

This SPIP has been up for 5 days and I see 10 people following it, but there 
has been no discussion since. Are you still digesting the proposal?  Are you 
just busy with an upcoming conference and haven't had time to look at it?  From 
the previous discussion it sounded like the core of what I am proposing is not 
that controversial, so I would like to move forward with it sooner than later, 
but I also want to give everyone time to understand it and ask questions.

Also I am looking for someone to be the shepherd.  I can technically do it, 
being a PMC member, but I have not been that active until recently so to avoid 
any concerns I would prefer to find someone else to be the shepard.

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure the APIs work, and without this feature it makes 
> that impossible.
>  
> Not Requirements, but things that would be nice to have.
>  # Provide default implementations for partitioning columnar data, so users 
> don’t have to.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  # Provide a clean transition from the existing code to the new one.  The 
> existing APIs which are public but evolving are not that far off from what is 
> being proposed.  We should be able to create a new parallel API that can wrap 
> the existing one. This means any file format that is trying to support 
> columnar can still do so until we make a conscious decision to deprecate and 
> then turn off the old APIs.
>  
> *Q2.* What problem is this proposal NOT designed to solve?
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation, and possibly default 
> implementations for partitioning of columnar shuffle. 
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
> code generation phase knows how to take that columnar data and iterate 
> through it as rows for stages that wants rows, which currently is almost 
> everything.  The limitations here are mostly implementation specific. The 
> current standard is to abuse Scala’s type erasure to return ColumnarBatches 
> as the elements of an RDD[InternalRow]. The code generation can handle this 
> because it is generating java code, so it bypasses scala’s type checking and 
> just casts the InternalRow to the desired ColumnarBatch.  This makes it 
> difficult for others to 

[jira] [Commented] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-05 Thread Robert Joseph Evans (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16811183#comment-16811183
 ] 

Robert Joseph Evans commented on SPARK-27396:
-

I have kept this at a high level just answering the questions in the SPIP 
questionnaire initially.  I am not totally sure where a design doc would fit 
into all of this, or an example of some of the things we want to do with the 
APIs.

 

I am happy to work on design docs or share more details about how I see the 
APIs working as needed.

> SPIP: Public APIs for extended Columnar Processing Support
> --
>
> Key: SPARK-27396
> URL: https://issues.apache.org/jira/browse/SPARK-27396
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Robert Joseph Evans
>Priority: Major
>
> *Q1.* What are you trying to do? Articulate your objectives using absolutely 
> no jargon.
>  
> The Dataset/DataFrame API in Spark currently only exposes to users one row at 
> a time when processing data.  The goals of this are to 
>  
>  # Expose to end users a new option of processing the data in a columnar 
> format, multiple rows at a time, with the data organized into contiguous 
> arrays in memory. 
>  # Make any transitions between the columnar memory layout and a row based 
> layout transparent to the end user.
>  # Allow for simple data exchange with other systems, DL/ML libraries, 
> pandas, etc. by having clean APIs to transform the columnar data into an 
> Apache Arrow compatible layout.
>  # Provide a plugin mechanism for columnar processing support so an advanced 
> user could avoid data transition between columnar and row based processing 
> even through shuffles. This means we should at least support pluggable APIs 
> so an advanced end user can implement the columnar partitioning themselves, 
> and provide the glue necessary to shuffle the data still in a columnar format.
>  # Expose new APIs that allow advanced users or frameworks to implement 
> columnar processing either as UDFs, or by adjusting the physical plan to do 
> columnar processing.  If the latter is too controversial we can move it to 
> another SPIP, but we plan to implement some accelerated computing in parallel 
> with this feature to be sure the APIs work, and without this feature it makes 
> that impossible.
>  
> Not Requirements, but things that would be nice to have.
>  # Provide default implementations for partitioning columnar data, so users 
> don’t have to.
>  # Transition the existing in memory columnar layouts to be compatible with 
> Apache Arrow.  This would make the transformations to Apache Arrow format a 
> no-op. The existing formats are already very close to those layouts in many 
> cases.  This would not be using the Apache Arrow java library, but instead 
> being compatible with the memory 
> [layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
> subset of that layout.
>  # Provide a clean transition from the existing code to the new one.  The 
> existing APIs which are public but evolving are not that far off from what is 
> being proposed.  We should be able to create a new parallel API that can wrap 
> the existing one. This means any file format that is trying to support 
> columnar can still do so until we make a conscious decision to deprecate and 
> then turn off the old APIs.
>  
> *Q2.* What problem is this proposal NOT designed to solve?
> This is not trying to implement any of the processing itself in a columnar 
> way, with the exception of examples for documentation, and possibly default 
> implementations for partitioning of columnar shuffle. 
>  
> *Q3.* How is it done today, and what are the limits of current practice?
> The current columnar support is limited to 3 areas.
>  # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
> code generation phase knows how to take that columnar data and iterate 
> through it as rows for stages that wants rows, which currently is almost 
> everything.  The limitations here are mostly implementation specific. The 
> current standard is to abuse Scala’s type erasure to return ColumnarBatches 
> as the elements of an RDD[InternalRow]. The code generation can handle this 
> because it is generating java code, so it bypasses scala’s type checking and 
> just casts the InternalRow to the desired ColumnarBatch.  This makes it 
> difficult for others to implement the same functionality for different 
> processing because they can only do it through code generation. There really 
> is no clean separate path in the code generation for columnar vs row based. 
> Additionally because it is only supported through code generation if for any 
> reason code generation would fail there is no backup.  This is 

[jira] [Created] (SPARK-27396) SPIP: Public APIs for extended Columnar Processing Support

2019-04-05 Thread Robert Joseph Evans (JIRA)
Robert Joseph Evans created SPARK-27396:
---

 Summary: SPIP: Public APIs for extended Columnar Processing Support
 Key: SPARK-27396
 URL: https://issues.apache.org/jira/browse/SPARK-27396
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Robert Joseph Evans


*Q1.* What are you trying to do? Articulate your objectives using absolutely no 
jargon.

 

The Dataset/DataFrame API in Spark currently only exposes to users one row at a 
time when processing data.  The goals of this are to 

 
 # Expose to end users a new option of processing the data in a columnar 
format, multiple rows at a time, with the data organized into contiguous arrays 
in memory. 
 # Make any transitions between the columnar memory layout and a row based 
layout transparent to the end user.
 # Allow for simple data exchange with other systems, DL/ML libraries, pandas, 
etc. by having clean APIs to transform the columnar data into an Apache Arrow 
compatible layout.
 # Provide a plugin mechanism for columnar processing support so an advanced 
user could avoid data transition between columnar and row based processing even 
through shuffles. This means we should at least support pluggable APIs so an 
advanced end user can implement the columnar partitioning themselves, and 
provide the glue necessary to shuffle the data still in a columnar format.
 # Expose new APIs that allow advanced users or frameworks to implement 
columnar processing either as UDFs, or by adjusting the physical plan to do 
columnar processing.  If the latter is too controversial we can move it to 
another SPIP, but we plan to implement some accelerated computing in parallel 
with this feature to be sure the APIs work, and without this feature it makes 
that impossible.

 

Not Requirements, but things that would be nice to have.
 # Provide default implementations for partitioning columnar data, so users 
don’t have to.
 # Transition the existing in memory columnar layouts to be compatible with 
Apache Arrow.  This would make the transformations to Apache Arrow format a 
no-op. The existing formats are already very close to those layouts in many 
cases.  This would not be using the Apache Arrow java library, but instead 
being compatible with the memory 
[layout|https://arrow.apache.org/docs/format/Layout.html] and possibly only a 
subset of that layout.
 # Provide a clean transition from the existing code to the new one.  The 
existing APIs which are public but evolving are not that far off from what is 
being proposed.  We should be able to create a new parallel API that can wrap 
the existing one. This means any file format that is trying to support columnar 
can still do so until we make a conscious decision to deprecate and then turn 
off the old APIs.

 

*Q2.* What problem is this proposal NOT designed to solve?

This is not trying to implement any of the processing itself in a columnar way, 
with the exception of examples for documentation, and possibly default 
implementations for partitioning of columnar shuffle. 

 

*Q3.* How is it done today, and what are the limits of current practice?

The current columnar support is limited to 3 areas.
 # Input formats, optionally can return a ColumnarBatch instead of rows.  The 
code generation phase knows how to take that columnar data and iterate through 
it as rows for stages that wants rows, which currently is almost everything.  
The limitations here are mostly implementation specific. The current standard 
is to abuse Scala’s type erasure to return ColumnarBatches as the elements of 
an RDD[InternalRow]. The code generation can handle this because it is 
generating java code, so it bypasses scala’s type checking and just casts the 
InternalRow to the desired ColumnarBatch.  This makes it difficult for others 
to implement the same functionality for different processing because they can 
only do it through code generation. There really is no clean separate path in 
the code generation for columnar vs row based. Additionally because it is only 
supported through code generation if for any reason code generation would fail 
there is no backup.  This is typically fine for input formats but can be 
problematic when we get into more extensive processing. 
 # When caching data it can optionally be cached in a columnar format if the 
input is also columnar.  This is similar to the first area and has the same 
limitations because the cache acts as an input, but it is the only piece of 
code that also consumes columnar data as an input.
 # Pandas vectorized processing.  To be able to support Pandas UDFs Spark will 
build up a batch of data and send it python for processing, and then get a 
batch of data back as a result.  The format of the data being sent to python 
can either be pickle, which is the default, or optionally Arrow. The result 
returned is