[jira] [Created] (SPARK-44520) Remove UNSUPPORTED_DATA_SOURCE_FOR_DIRECT_QUERY in favor of UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY
Kent Yao created SPARK-44520: Summary: Remove UNSUPPORTED_DATA_SOURCE_FOR_DIRECT_QUERY in favor of UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY Key: SPARK-44520 URL: https://issues.apache.org/jira/browse/SPARK-44520 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.5.0 Reporter: Kent Yao -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44519) SparkConnectServerUtils generated incorrect parameters for jars
jiaan.geng created SPARK-44519: -- Summary: SparkConnectServerUtils generated incorrect parameters for jars Key: SPARK-44519 URL: https://issues.apache.org/jira/browse/SPARK-44519 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.0 Reporter: jiaan.geng SparkConnectServerUtils generate multiple --jars parameters. It will cause the bug that doesn't find out the class. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44514) Optimize join if maximum number of rows on one side is 1
[ https://issues.apache.org/jira/browse/SPARK-44514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang updated SPARK-44514: Summary: Optimize join if maximum number of rows on one side is 1 (was: Rewrite the join to filter if one side maximum number of rows is 1) > Optimize join if maximum number of rows on one side is 1 > > > Key: SPARK-44514 > URL: https://issues.apache.org/jira/browse/SPARK-44514 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Yuming Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] (SPARK-40588) Sorting issue with partitioned-writing and AQE turned on
[ https://issues.apache.org/jira/browse/SPARK-40588 ] Yiu-Chung Lee deleted comment on SPARK-40588: --- was (Author: JIRAUSER301473): I discovered similar sort-then-partitionby issue in Spark 3.4.1 in another use case, mentioned in SPARK-44512. > Sorting issue with partitioned-writing and AQE turned on > > > Key: SPARK-40588 > URL: https://issues.apache.org/jira/browse/SPARK-40588 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.3 > Environment: Spark v3.1.3 > Scala v2.12.13 >Reporter: Swetha Baskaran >Assignee: Enrico Minack >Priority: Major > Labels: correctness > Fix For: 3.2.3, 3.3.2 > > Attachments: image-2022-10-16-22-05-47-159.png > > > We are attempting to partition data by a few columns, sort by a particular > _sortCol_ and write out one file per partition. > {code:java} > df > .repartition(col("day"), col("month"), col("year")) > .withColumn("partitionId",spark_partition_id) > .withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId) > .sortWithinPartitions("year", "month", "day", "sortCol") > .withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId) > .write > .partitionBy("year", "month", "day") > .parquet(path){code} > When inspecting the results, we observe one file per partition, however we > see an _alternating_ pattern of unsorted rows in some files. > {code:java} > {"sortCol":10,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832121344,"monotonicallyIncreasingIdSorted":6287832121344} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877022389,"monotonicallyIncreasingIdSorted":6287876860586} > {"sortCol":10,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877567881,"monotonicallyIncreasingIdSorted":6287832121345} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287835105553,"monotonicallyIncreasingIdSorted":6287876860587} > {"sortCol":10,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832570127,"monotonicallyIncreasingIdSorted":6287832121346} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287879965760,"monotonicallyIncreasingIdSorted":6287876860588} > {"sortCol":10,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287878762347,"monotonicallyIncreasingIdSorted":6287832121347} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287837165012,"monotonicallyIncreasingIdSorted":6287876860589} > {"sortCol":10,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832910545,"monotonicallyIncreasingIdSorted":6287832121348} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287881244758,"monotonicallyIncreasingIdSorted":6287876860590} > {"sortCol":10,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287880041345,"monotonicallyIncreasingIdSorted":6287832121349}{code} > Here is a > [gist|https://gist.github.com/Swebask/543030748a768be92d3c0ae343d2ae89] to > reproduce the issue. > Turning off AQE with spark.conf.set("spark.sql.adaptive.enabled", false) > fixes the issue. > I'm working on identifying why AQE affects the sort order. Any leads or > thoughts would be appreciated! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44518) Completely make hive as a data source
[ https://issues.apache.org/jira/browse/SPARK-44518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746191#comment-17746191 ] He Qi commented on SPARK-44518: --- [~LuciferYang] [~yumwang] [~Qin Yao] [~csun] WDYT? > Completely make hive as a data source > - > > Key: SPARK-44518 > URL: https://issues.apache.org/jira/browse/SPARK-44518 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.5.0 >Reporter: He Qi >Priority: Major > Fix For: 4.0.0 > > > Now, hive is a different data source from other data sources. In Spark > Project, Hive have many special logic and burden the cost of maintenance . > Like presto, hive is only a connector. Is it possible that we can make hive > as a data source completely? > Surely, I know that it's very difficult. It has many historical problems and > compatible problems. Could we reduce these problems as possible as we can if > we release 4.0? > I just wanna start a discussion to collect more people's suggestion. Any > suggestion is welcome. I just feel 4.0 is a good opportunity to discuss this > issue. > If I am wrong, it's welcome to point it out. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44518) Completely make hive as a data source
[ https://issues.apache.org/jira/browse/SPARK-44518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] He Qi updated SPARK-44518: -- Shepherd: (was: Yang Jie) > Completely make hive as a data source > - > > Key: SPARK-44518 > URL: https://issues.apache.org/jira/browse/SPARK-44518 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.5.0 >Reporter: He Qi >Priority: Major > Fix For: 4.0.0 > > > Now, hive is a different data source from other data sources. In Spark > Project, Hive have many special logic and burden the cost of maintenance . > Like presto, hive is only a connector. Is it possible that we can make hive > as a data source completely? > Surely, I know that it's very difficult. It has many historical problems and > compatible problems. Could we reduce these problems as possible as we can if > we release 4.0? > I just wanna start a discussion to collect more people's suggestion. Any > suggestion is welcome. I just feel 4.0 is a good opportunity to discuss this > issue. > If I am wrong, it's welcome to point it out. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44518) Completely make hive as a data source
[ https://issues.apache.org/jira/browse/SPARK-44518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] He Qi updated SPARK-44518: -- Shepherd: Yang Jie (was: yu) > Completely make hive as a data source > - > > Key: SPARK-44518 > URL: https://issues.apache.org/jira/browse/SPARK-44518 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.5.0 >Reporter: He Qi >Priority: Major > Fix For: 4.0.0 > > > Now, hive is a different data source from other data sources. In Spark > Project, Hive have many special logic and burden the cost of maintenance . > Like presto, hive is only a connector. Is it possible that we can make hive > as a data source completely? > Surely, I know that it's very difficult. It has many historical problems and > compatible problems. Could we reduce these problems as possible as we can if > we release 4.0? > I just wanna start a discussion to collect more people's suggestion. Any > suggestion is welcome. I just feel 4.0 is a good opportunity to discuss this > issue. > If I am wrong, it's welcome to point it out. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44518) Completely make hive as a data source
[ https://issues.apache.org/jira/browse/SPARK-44518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] He Qi updated SPARK-44518: -- Shepherd: yu (was: Yang Jie) > Completely make hive as a data source > - > > Key: SPARK-44518 > URL: https://issues.apache.org/jira/browse/SPARK-44518 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 3.5.0 >Reporter: He Qi >Priority: Major > Fix For: 4.0.0 > > > Now, hive is a different data source from other data sources. In Spark > Project, Hive have many special logic and burden the cost of maintenance . > Like presto, hive is only a connector. Is it possible that we can make hive > as a data source completely? > Surely, I know that it's very difficult. It has many historical problems and > compatible problems. Could we reduce these problems as possible as we can if > we release 4.0? > I just wanna start a discussion to collect more people's suggestion. Any > suggestion is welcome. I just feel 4.0 is a good opportunity to discuss this > issue. > If I am wrong, it's welcome to point it out. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44514) Rewrite the join to filter if one side maximum number of rows is 1
[ https://issues.apache.org/jira/browse/SPARK-44514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746185#comment-17746185 ] Snoot.io commented on SPARK-44514: -- User 'wangyum' has created a pull request for this issue: https://github.com/apache/spark/pull/42114 > Rewrite the join to filter if one side maximum number of rows is 1 > -- > > Key: SPARK-44514 > URL: https://issues.apache.org/jira/browse/SPARK-44514 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Yuming Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44518) Completely make hive as a data source
He Qi created SPARK-44518: - Summary: Completely make hive as a data source Key: SPARK-44518 URL: https://issues.apache.org/jira/browse/SPARK-44518 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 3.5.0 Reporter: He Qi Fix For: 4.0.0 Now, hive is a different data source from other data sources. In Spark Project, Hive have many special logic and burden the cost of maintenance . Like presto, hive is only a connector. Is it possible that we can make hive as a data source completely? Surely, I know that it's very difficult. It has many historical problems and compatible problems. Could we reduce these problems as possible as we can if we release 4.0? I just wanna start a discussion to collect more people's suggestion. Any suggestion is welcome. I just feel 4.0 is a good opportunity to discuss this issue. If I am wrong, it's welcome to point it out. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44493) Extract pushable predicates from disjunctive predicates
[ https://issues.apache.org/jira/browse/SPARK-44493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746175#comment-17746175 ] Snoot.io commented on SPARK-44493: -- User 'wangyum' has created a pull request for this issue: https://github.com/apache/spark/pull/42112 > Extract pushable predicates from disjunctive predicates > --- > > Key: SPARK-44493 > URL: https://issues.apache.org/jira/browse/SPARK-44493 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Yuming Wang >Priority: Major > Attachments: after.png, before.png > > > Example: > {code:sql} > select count(*) > from > db.very_large_table > where > session_start_dt between date_sub('2023-07-15', 1) and > date_add('2023-07-16', 1) > and type = 'event' > and date(event_timestamp) between '2023-07-15' and '2023-07-16' > and ( > ( > page_id in (2627, 2835, 2402999) > and -- other predicates > and rdt = 0 > ) or ( > page_id in (2616, 3411350) > and rdt = 0 > ) or ( > page_id = 2403006 > ) or ( > page_id in (2208336, 2356359) > and -- other predicates > and rdt = 0 > ) > ) > {code} > We can push down {{page_id in(2627, 2835, 2402999, 2616, 3411350, 2403006, > 2208336, 2356359)}} to datasource. > Before: > !before.png! > After: > !after.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44033) Support list-like for binary ops
[ https://issues.apache.org/jira/browse/SPARK-44033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746167#comment-17746167 ] Haejoon Lee commented on SPARK-44033: - > Do we need to implement other binary ops just like this Correct, basically we try to reuse the PySpark functions from JVM side internally like [https://github.com/apache/spark/blob/515dfc166a95ecc8decce0f0cd99e06fe395f94f/python/pyspark/sql/column.py#L277C1-L277C1.] But in case we can't leverage it like this case, we should manually implement this in Python code like [https://github.com/apache/spark/blob/46440a4a542148bc05b8c0f80d1860e6380efdb6/python/pyspark/pandas/data_type_ops/base.py#L394C1-L394C1.] > Support list-like for binary ops > > > Key: SPARK-44033 > URL: https://issues.apache.org/jira/browse/SPARK-44033 > Project: Spark > Issue Type: Bug > Components: Pandas API on Spark >Affects Versions: 3.5.0 >Reporter: Haejoon Lee >Priority: Major > > We should fix the error below: > {code:java} > >>> pser = pd.Series([1, 2, 3, 4, 5, 6], name="x") > >>> psser = ps.from_pandas(pser) > >>> other = [np.nan, 1, 3, 4, np.nan, 6] > >>> psser <= other > Traceback (most recent call last): > File "", line 1, in > File > "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/base.py", > line 412, in __le__ > return self._dtype_op.le(self, other) > File > "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/data_type_ops/num_ops.py", > line 242, in le > _sanitize_list_like(right) > File > "/Users/haejoon.lee/Desktop/git_store/spark/python/pyspark/pandas/data_type_ops/base.py", > line 199, in _sanitize_list_like > raise TypeError("The operation can not be applied to %s." % > type(operand).__name__) > TypeError: The operation can not be applied to list.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-44513) Upgrade snappy-java to 1.1.10.3
[ https://issues.apache.org/jira/browse/SPARK-44513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kent Yao reassigned SPARK-44513: Assignee: BingKun Pan > Upgrade snappy-java to 1.1.10.3 > --- > > Key: SPARK-44513 > URL: https://issues.apache.org/jira/browse/SPARK-44513 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 4.0.0 >Reporter: BingKun Pan >Assignee: BingKun Pan >Priority: Trivial > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44513) Upgrade snappy-java to 1.1.10.3
[ https://issues.apache.org/jira/browse/SPARK-44513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kent Yao resolved SPARK-44513. -- Fix Version/s: 3.5.0 4.0.0 Resolution: Fixed Issue resolved by pull request 42113 [https://github.com/apache/spark/pull/42113] > Upgrade snappy-java to 1.1.10.3 > --- > > Key: SPARK-44513 > URL: https://issues.apache.org/jira/browse/SPARK-44513 > Project: Spark > Issue Type: Improvement > Components: Build >Affects Versions: 4.0.0 >Reporter: BingKun Pan >Assignee: BingKun Pan >Priority: Trivial > Fix For: 3.5.0, 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44517) first operator should respect the nullability of child expression as well as ignoreNulls option
Nan Zhu created SPARK-44517: --- Summary: first operator should respect the nullability of child expression as well as ignoreNulls option Key: SPARK-44517 URL: https://issues.apache.org/jira/browse/SPARK-44517 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.1, 3.4.0, 3.3.2, 3.2.4, 3.2.3, 3.3.1, 3.2.2, 3.3.0, 3.2.1, 3.2.0 Reporter: Nan Zhu I found the following problem when using Spark recently: {code:java} // code placeholder import spark.implicits._ val s = Seq((1.2, "s", 2.2)).toDF("v1", "v2", "v3") val schema = StructType(Seq(StructField("v1", DoubleType, nullable = false),StructField("v2", StringType, nullable = true),StructField("v3", DoubleType, nullable = false))) val df = spark.createDataFrame(s.rdd, schema)val inputDF = val inputDF = df.dropDuplicates("v3") spark.sql("CREATE TABLE local.db.table (\n v1 DOUBLE NOT NULL,\n v2 STRING, v3 DOUBLE NOT NULL)") inputDF.write.mode("overwrite").format("iceberg").save("local.db.table") {code} when I use the above code to write to iceberg (i guess Delta Lake will have the same problem) , I got very confusing exception {code:java} Exception in thread "main" java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schema: table { 1: v1: required double 2: v2: optional string 3: v3: required double} Provided schema: table { 1: v1: optional double 2: v2: optional string 3: v3: required double} {code} basically it complains that we have v1 as the nullable column in our `inputDF` above which is not allowed since we created table with the v1 as not nullable. The confusion comes from that, if we check the schema with printSchema() of inputDF, v1 is not nullable {noformat} root |-- v1: double (nullable = false) |-- v2: string (nullable = true) |-- v3: double (nullable = false){noformat} Clearly, something changed the v1's nullability unexpectedly! After some debugging I found that the key is that dropDuplicates("v3"). In optimization phase, we have ReplaceDeduplicateWithAggregate to replace the Deduplicate with aggregate on v3 and run first() over all other columns. However, first() operator has hard coded nullable as always "true" which is the source of changed nullability of v1 this is a very confusing behavior of Spark, and probably no one really noticed as we do not care too much without the new table formats like delta lake and iceberg which can make nullability check correctly. Nowadays, we users adopt them more and more, this is surfaced up -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-44510) Update dataTables to 1.13.5 and remove some unreached png files
[ https://issues.apache.org/jira/browse/SPARK-44510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kent Yao resolved SPARK-44510. -- Fix Version/s: 3.5.0 4.0.0 Resolution: Fixed Issue resolved by pull request 42108 [https://github.com/apache/spark/pull/42108] > Update dataTables to 1.13.5 and remove some unreached png files > --- > > Key: SPARK-44510 > URL: https://issues.apache.org/jira/browse/SPARK-44510 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 3.5.0 >Reporter: Kent Yao >Assignee: Kent Yao >Priority: Major > Fix For: 3.5.0, 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-44510) Update dataTables to 1.13.5 and remove some unreached png files
[ https://issues.apache.org/jira/browse/SPARK-44510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kent Yao reassigned SPARK-44510: Assignee: Kent Yao > Update dataTables to 1.13.5 and remove some unreached png files > --- > > Key: SPARK-44510 > URL: https://issues.apache.org/jira/browse/SPARK-44510 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 3.5.0 >Reporter: Kent Yao >Assignee: Kent Yao >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44516) Spark Connect Python StreamingQueryListener removeListener method
Wei Liu created SPARK-44516: --- Summary: Spark Connect Python StreamingQueryListener removeListener method Key: SPARK-44516 URL: https://issues.apache.org/jira/browse/SPARK-44516 Project: Spark Issue Type: New Feature Components: Connect, Structured Streaming Affects Versions: 3.5.0, 4.0.0 Reporter: Wei Liu -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44512) dataset.sort.select.write.partitionBy does not return a sorted output if AQE is enabled
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yiu-Chung Lee updated SPARK-44512: -- Description: (In this example the dataset is of type Tuple3, and the columns are named _1, _2 and _3) I found then when AQE is enabled, the following code does not produce sorted output {{dataset.sort("_1")}} {{.select("_2", "_3")}} {{.write()}} {{.partitionBy("_2")}} {{.text("output");}} However, if I insert an identity mapper between select and write, the output would be sorted as expected. {{dataset = dataset.sort("_1")}} {{.select("_2", "_3");}} {{dataset.map((MapFunction) row -> row, dataset.encoder())}} {{.write()}} {{{}.{}}}{{{}partitionBy("_2"){}}} {{.text("output")}} Below is the complete code that reproduces the problem. was: (In this example the dataset is of type Tuple3, and the columns are named _1, _2 and _3) I found then when AQE is enabled, the following code does not produce sorted output {{dataset.sort("_1")}} {{.select("_2", "_3")}} {{.write()}} {{.partitionBy("_2")}} {{.text("output");}} However, if I insert an identity mapper between select and write, the output would be sorted as expected. {{dataset = dataset.sort("_1")}} {{.select("_2", "_3");}} {{dataset.map((MapFunction) row -> row, dataset.encoder())}} {{.write()}} {{{}.{}}}{{{}partitionBy("_2"){}}} {{.text("output")}} Below is the complete code that replicates the problem. > dataset.sort.select.write.partitionBy does not return a sorted output if AQE > is enabled > --- > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 >Reporter: Yiu-Chung Lee >Priority: Major > Labels: correctness > Attachments: Test.java > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found then when AQE is enabled, the following code does not produce sorted > output > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > However, if I insert an identity mapper between select and write, the output > would be sorted as expected. > {{dataset = dataset.sort("_1")}} > {{.select("_2", "_3");}} > {{dataset.map((MapFunction) row -> row, dataset.encoder())}} > {{.write()}} > {{{}.{}}}{{{}partitionBy("_2"){}}} > {{.text("output")}} > Below is the complete code that reproduces the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40588) Sorting issue with partitioned-writing and AQE turned on
[ https://issues.apache.org/jira/browse/SPARK-40588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746147#comment-17746147 ] Yiu-Chung Lee commented on SPARK-40588: --- I discovered similar sort-then-partitionby issue in Spark 3.4.1 in another use case, mentioned in SPARK-44512. > Sorting issue with partitioned-writing and AQE turned on > > > Key: SPARK-40588 > URL: https://issues.apache.org/jira/browse/SPARK-40588 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.1.3 > Environment: Spark v3.1.3 > Scala v2.12.13 >Reporter: Swetha Baskaran >Assignee: Enrico Minack >Priority: Major > Labels: correctness > Fix For: 3.2.3, 3.3.2 > > Attachments: image-2022-10-16-22-05-47-159.png > > > We are attempting to partition data by a few columns, sort by a particular > _sortCol_ and write out one file per partition. > {code:java} > df > .repartition(col("day"), col("month"), col("year")) > .withColumn("partitionId",spark_partition_id) > .withColumn("monotonicallyIncreasingIdUnsorted",monotonicallyIncreasingId) > .sortWithinPartitions("year", "month", "day", "sortCol") > .withColumn("monotonicallyIncreasingIdSorted",monotonicallyIncreasingId) > .write > .partitionBy("year", "month", "day") > .parquet(path){code} > When inspecting the results, we observe one file per partition, however we > see an _alternating_ pattern of unsorted rows in some files. > {code:java} > {"sortCol":10,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832121344,"monotonicallyIncreasingIdSorted":6287832121344} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877022389,"monotonicallyIncreasingIdSorted":6287876860586} > {"sortCol":10,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287877567881,"monotonicallyIncreasingIdSorted":6287832121345} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287835105553,"monotonicallyIncreasingIdSorted":6287876860587} > {"sortCol":10,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832570127,"monotonicallyIncreasingIdSorted":6287832121346} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287879965760,"monotonicallyIncreasingIdSorted":6287876860588} > {"sortCol":10,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287878762347,"monotonicallyIncreasingIdSorted":6287832121347} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287837165012,"monotonicallyIncreasingIdSorted":6287876860589} > {"sortCol":10,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287832910545,"monotonicallyIncreasingIdSorted":6287832121348} > {"sortCol":1303413,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287881244758,"monotonicallyIncreasingIdSorted":6287876860590} > {"sortCol":10,"partitionId":732,"monotonicallyIncreasingIdUnsorted":6287880041345,"monotonicallyIncreasingIdSorted":6287832121349}{code} > Here is a > [gist|https://gist.github.com/Swebask/543030748a768be92d3c0ae343d2ae89] to > reproduce the issue. > Turning off AQE with spark.conf.set("spark.sql.adaptive.enabled", false) > fixes the issue. > I'm working on identifying why AQE affects the sort order. Any leads or > thoughts would be appreciated! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44512) dataset.sort.select.write.partitionBy does not return a sorted output if AQE is enabled
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yiu-Chung Lee updated SPARK-44512: -- Labels: correctness (was: ) > dataset.sort.select.write.partitionBy does not return a sorted output if AQE > is enabled > --- > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 >Reporter: Yiu-Chung Lee >Priority: Major > Labels: correctness > Attachments: Test.java > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found then when AQE is enabled, the following code does not produce sorted > output > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > However, if I insert an identity mapper between select and write, the output > would be sorted as expected. > {{dataset = dataset.sort("_1")}} > {{.select("_2", "_3");}} > {{dataset.map((MapFunction) row -> row, dataset.encoder())}} > {{.write()}} > {{{}.{}}}{{{}partitionBy("_2"){}}} > {{.text("output")}} > Below is the complete code that replicates the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44514) Rewrite the join to filter if one side maximum number of rows is 1
[ https://issues.apache.org/jira/browse/SPARK-44514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746143#comment-17746143 ] Yuming Wang commented on SPARK-44514: - https://github.com/apache/spark/pull/42114 > Rewrite the join to filter if one side maximum number of rows is 1 > -- > > Key: SPARK-44514 > URL: https://issues.apache.org/jira/browse/SPARK-44514 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Yuming Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44515) Code Improvement: PySpark add util function to set python version
Wei Liu created SPARK-44515: --- Summary: Code Improvement: PySpark add util function to set python version Key: SPARK-44515 URL: https://issues.apache.org/jira/browse/SPARK-44515 Project: Spark Issue Type: New Feature Components: PySpark Affects Versions: 3.5.0, 4.0.0 Reporter: Wei Liu If searching for `"%d.%d" % sys.version_info[:2]` there are multiple occurrence of such method, we should create a separate util method for this -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33782) Place spark.files, spark.jars and spark.files under the current working directory on the driver in K8S cluster mode
[ https://issues.apache.org/jira/browse/SPARK-33782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746127#comment-17746127 ] Szymon Kuryło commented on SPARK-33782: --- [~pratik.malani] I have a similar problem, and thus a question: Why doesn't your destination path contain the randomly named directory? it should be created inside the `KubernetesUtils.uploadFileUri` method. > Place spark.files, spark.jars and spark.files under the current working > directory on the driver in K8S cluster mode > --- > > Key: SPARK-33782 > URL: https://issues.apache.org/jira/browse/SPARK-33782 > Project: Spark > Issue Type: Bug > Components: Kubernetes >Affects Versions: 3.2.0 >Reporter: Hyukjin Kwon >Assignee: Pralabh Kumar >Priority: Major > Fix For: 3.4.0 > > > In Yarn cluster modes, the passed files are able to be accessed in the > current working directory. Looks like this is not the case in Kubernates > cluset mode. > By doing this, users can, for example, leverage PEX to manage Python > dependences in Apache Spark: > {code} > pex pyspark==3.0.1 pyarrow==0.15.1 pandas==0.25.3 -o myarchive.pex > PYSPARK_PYTHON=./myarchive.pex spark-submit --files myarchive.pex > {code} > See also https://github.com/apache/spark/pull/30735/files#r540935585. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-36277) Issue with record count of data frame while reading in DropMalformed mode
[ https://issues.apache.org/jira/browse/SPARK-36277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746120#comment-17746120 ] Jose Santos commented on SPARK-36277: -- I know this issue has been created 2 Years ago, but if anyone is interested I got a workaround working by using pandas in a very small dataset (for a unit test). Something like: {code:java} def test_counts_99_rows(local_csv_data_ignore_last_no_write): local_csv_data_ignore_last_no_write.df.show(200) # result = local_csv_data_ignore_last_no_write.df.count() # This has a known bug # The bug can be seen here: https://issues.apache.org/jira/browse/SPARK-36277 result = len(local_csv_data_ignore_last_no_write.df.toPandas()) expected = 99 assert result == expected # local_csv_data_ignore_last_no_write is just a function that has a pytest decorator: @pytest.fixture(scope="module", autouse=True) The function itself reads the CSV file (100 lines, but one is not really read, though the count() still returns 100){code} > Issue with record count of data frame while reading in DropMalformed mode > - > > Key: SPARK-36277 > URL: https://issues.apache.org/jira/browse/SPARK-36277 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 >Reporter: anju >Priority: Major > Attachments: 111.PNG, Inputfile.PNG, sample.csv > > > I am writing the steps to reproduce the issue for "count" pyspark api while > using mode as dropmalformed. > I have a csv sample file in s3 bucket . I am reading the file using pyspark > api for csv . I am reading the csv "without schema" and "with schema using > mode 'dropmalformed' options in two different dataframes . While displaying > the "with schema using mode 'dropmalformed'" dataframe , the display looks > good ,it is not showing the malformed records .But when we apply count api on > the dataframe it gives the record count of actual file. I am expecting it > should give me valid record count . > here is the code used:- > {code} > without_schema_df=spark.read.csv("s3://noa-poc-lakeformation/data/test_files/sample.csv",header=True) > schema = StructType([ \ > StructField("firstname",StringType(),True), \ > StructField("middlename",StringType(),True), \ > StructField("lastname",StringType(),True), \ > StructField("id", StringType(), True), \ > StructField("gender", StringType(), True), \ > StructField("salary", IntegerType(), True) \ > ]) > with_schema_df = > spark.read.csv("s3://noa-poc-lakeformation/data/test_files/sample.csv",header=True,schema=schema,mode="DROPMALFORMED") > print("The dataframe with schema") > with_schema_df.show() > print("The dataframe without schema") > without_schema_df.show() > cnt_with_schema=with_schema_df.count() > print("The records count from with schema df :"+str(cnt_with_schema)) > cnt_without_schema=without_schema_df.count() > print("The records count from without schema df: "+str(cnt_without_schema)) > {code} > here is the outputs screen shot 111.PNG is the outputs of the code and > inputfile.csv is the input to the code > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44514) Rewrite the join to filter if one side maximum number of rows is 1
Yuming Wang created SPARK-44514: --- Summary: Rewrite the join to filter if one side maximum number of rows is 1 Key: SPARK-44514 URL: https://issues.apache.org/jira/browse/SPARK-44514 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 4.0.0 Reporter: Yuming Wang -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44513) Upgrade snappy-java to 1.1.10.3
BingKun Pan created SPARK-44513: --- Summary: Upgrade snappy-java to 1.1.10.3 Key: SPARK-44513 URL: https://issues.apache.org/jira/browse/SPARK-44513 Project: Spark Issue Type: Improvement Components: Build Affects Versions: 4.0.0 Reporter: BingKun Pan -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44512) dataset.sort.select.write.partitionBy does not return a sorted output if AQE is enabled
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yiu-Chung Lee updated SPARK-44512: -- Description: (In this example the dataset is of type Tuple3, and the columns are named _1, _2 and _3) I found then when AQE is enabled, the following code does not produce sorted output {{dataset.sort("_1")}} {{.select("_2", "_3")}} {{.write()}} {{.partitionBy("_2")}} {{.text("output");}} However, if I insert an identity mapper between select and write, the output would be sorted as expected. {{dataset = dataset.sort("_1")}} {{.select("_2", "_3");}} {{dataset.map((MapFunction) row -> row, dataset.encoder())}} {{.write()}} {{{}.{}}}{{{}partitionBy("_2"){}}} {{.text("output")}} Below is the complete code that replicates the problem. was: (In this example the dataset is of type Tuple3, and the columns are named _1, _2 and _3) I found then when AQE is enabled, the following code does not produce an output {{dataset.sort("_1")}} {{.select("_2", "_3")}} {{.write()}} {{.partitionBy("_2")}} {{.text("output");}} However, if I insert an identity mapper between select and write, the output would be sorted as expected. {{dataset = dataset.sort("_1")}} {{.select("_2", "_3");}} {{dataset.map((MapFunction) row -> row, dataset.encoder())}} {{.write()}} {{{}.{}}}{{{}partitionBy("_2"){}}} {{.text("output")}} Below is the complete code that replicates the problem. > dataset.sort.select.write.partitionBy does not return a sorted output if AQE > is enabled > --- > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 >Reporter: Yiu-Chung Lee >Priority: Major > Attachments: Test.java > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found then when AQE is enabled, the following code does not produce sorted > output > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > However, if I insert an identity mapper between select and write, the output > would be sorted as expected. > {{dataset = dataset.sort("_1")}} > {{.select("_2", "_3");}} > {{dataset.map((MapFunction) row -> row, dataset.encoder())}} > {{.write()}} > {{{}.{}}}{{{}partitionBy("_2"){}}} > {{.text("output")}} > Below is the complete code that replicates the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44512) dataset.sort.select.write.partitionBy does not return a sorted output if AQE is enabled
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yiu-Chung Lee updated SPARK-44512: -- Description: (In this example the dataset is of type Tuple3, and the columns are named _1, _2 and _3) I found then when AQE is enabled, the following code does not produce an output {{dataset.sort("_1")}} {{.select("_2", "_3")}} {{.write()}} {{.partitionBy("_2")}} {{.text("output");}} However, if I insert an identity mapper between select and write, the output would be sorted as expected. {{dataset = dataset.sort("_1")}} {{.select("_2", "_3");}} {{dataset.map((MapFunction) row -> row, dataset.encoder())}} {{.write()}} {{{}.{}}}{{{}partitionBy("_2"){}}} {{.text("output")}} Below is the complete code that replicates the problem. was: (In this example the dataset is of type Tuple3, and the columns are named _1, _2 and _3) I found then when AQE is enabled, the following code does not produce an output {{dataset.sort("_1")}} {{.select("_2", "_3")}} {{.write()}} {{.partitionBy("_2")}} {{.text("output");}} However, if I insert an identity mapper between select and write, the output would be sorted as expected. {{dataset = dataset.sort("_1")}} {{.select("_2", "_3");}} {{dataset.map((MapFunction) row -> row, dataset.encoder())}} {{{}.write(){}}}{{{}.partitionBy("_2"){}}} {{.text("output")}} Below is the complete code that replicates the problem. > dataset.sort.select.write.partitionBy does not return a sorted output if AQE > is enabled > --- > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 >Reporter: Yiu-Chung Lee >Priority: Major > Attachments: Test.java > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found then when AQE is enabled, the following code does not produce an > output > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > However, if I insert an identity mapper between select and write, the output > would be sorted as expected. > {{dataset = dataset.sort("_1")}} > {{.select("_2", "_3");}} > {{dataset.map((MapFunction) row -> row, dataset.encoder())}} > {{.write()}} > {{{}.{}}}{{{}partitionBy("_2"){}}} > {{.text("output")}} > Below is the complete code that replicates the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44512) dataset.sort.select.write.partitionBy does not return a sorted output if AQE is enabled
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yiu-Chung Lee updated SPARK-44512: -- Description: (In this example the dataset is of type Tuple3, and the columns are named _1, _2 and _3) I found then when AQE is enabled, the following code does not produce an output {{dataset.sort("_1")}} {{.select("_2", "_3")}} {{.write()}} {{.partitionBy("_2")}} {{.text("output");}} However, if I insert an identity mapper between select and write, the output would be sorted as expected. {{dataset = dataset.sort("_1")}} {{.select("_2", "_3");}} {{dataset.map((MapFunction) row -> row, dataset.encoder())}} {{{}.write(){}}}{{{}.partitionBy("_2"){}}} {{.text("output")}} Below is the complete code that replicates the problem. was: (In this example the dataset is of type Tuple3, and the columns are named _1, _2 and _3) I found then when AQE is enabled, the following code does not produce an output {{dataset.sort("_1")}} {{.select("_2", "_3")}} {{.write()}} {{.partitionBy("_2")}} {{.text("output");}} However, if I insert an identity mapper between select and write, the output would be sorted as expected. {{dataset.sort("_1")}} {{.select("_2", "_3")}} {{.map((MapFunction) row -> row, dataset.encoder())}} {{{}.write(){}}}{{{}.partitionBy("_2"){}}} {{.text("output")}} Below is the complete code that replicates the problem. > dataset.sort.select.write.partitionBy does not return a sorted output if AQE > is enabled > --- > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 >Reporter: Yiu-Chung Lee >Priority: Major > Attachments: Test.java > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found then when AQE is enabled, the following code does not produce an > output > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > However, if I insert an identity mapper between select and write, the output > would be sorted as expected. > {{dataset = dataset.sort("_1")}} > {{.select("_2", "_3");}} > {{dataset.map((MapFunction) row -> row, dataset.encoder())}} > {{{}.write(){}}}{{{}.partitionBy("_2"){}}} > {{.text("output")}} > Below is the complete code that replicates the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44512) dataset.sort.select.write.partitionBy does not return a sorted output if AQE is enabled
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746025#comment-17746025 ] Yiu-Chung Lee commented on SPARK-44512: --- [^Test.java] (Attached the code) To compile: javac Test.java && jar cvf Test.jar Test.class bug reproduce: spark-submit --class Test Test.jar no bug if workaround is enabled: spark-submit --class Test Test.jar workaround no bug too if AQE is disabled: spark-submit --conf spark.sql.adaptive.enabled=false --class Test Test.jar (3 output files in each partition key) > dataset.sort.select.write.partitionBy does not return a sorted output if AQE > is enabled > --- > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 > Environment: Code that replicates the problem: > > import java.io.IOException; > import java.util.List; > import java.util.function.Function; > import org.apache.hadoop.fs.FileSystem; > import org.apache.spark.api.java.function.MapFunction; > import org.apache.spark.sql.Encoders; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.SparkSession; > import scala.Tuple3; > // To compile: javac Test.java && jar cvf Test.jar Test.class > // bug: spark-submit --class Test Test.jar > // no bug: spark-submit --class Test Test.jar workaround > // no bug: spark-submit --conf spark.sql.adaptive.enabled=false --class Test > Test.jar (3 output files in each partition key) > public class Test { > public static void main(String args[]) throws IOException { > final var spark = SparkSession > .builder() > .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", > "false") > .getOrCreate(); > final var hadoopConf = spark.sparkContext().hadoopConfiguration(); > final var fs = FileSystem.get(hadoopConf); > fs.setWriteChecksum(false); > // create a minimal dataset that is enough to reproduce the bug > // The three columns are named _1, _2, and _3 (the field names of > Tuple3) > var dataset = spark.createDataset(List.of( > new Tuple3(3L, "a", "r"), > new Tuple3(3L, "b", "r"), > new Tuple3(2L, "b", "q"), > new Tuple3(2L, "a", "q"), > new Tuple3(1L, "a", "p"), > new Tuple3(1L, "b", "p") > ), > Encoders.tuple(Encoders.LONG(), Encoders.STRING(), > Encoders.STRING())) > .sort("_1") > .select("_2", "_3"); > // This is an identity mapper, i.e. returns itself > // Enabled by adding an argument "workaround" when executing > spark-submit. > // With AQE enabled, .sort() will work as intended only if this > identity mapper > // is inserted between .sort() and .select() in the pipeline > if (args.length > 0 && args[0].equals("workaround")) { > dataset = dataset.map((MapFunction) row -> row, > dataset.encoder()); > } > // output column _3 to text files, partitioned by column _2 > // _1 is only for sorting purpose, not used in output > // output will not be sorted without the identity mapper > dataset.write() > .mode("overwrite") > .partitionBy("_2") > .text("output"); > dataset.explain(); > spark.close(); > } > } >Reporter: Yiu-Chung Lee >Priority: Major > Attachments: Test.java > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found then when AQE is enabled, the following code does not produce an > output > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > However, if I insert an identity mapper between select and write, the output > would be sorted as expected. > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.map((MapFunction) row -> row, dataset.encoder())}} > {{{}.write(){}}}{{{}.partitionBy("_2"){}}} > {{.text("output")}} > Below is the complete code that replicates the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44512) dataset.sort.select.write.partitionBy does not return a sorted output if AQE is enabled
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yiu-Chung Lee updated SPARK-44512: -- Environment: (was: Code that replicates the problem: import java.io.IOException; import java.util.List; import java.util.function.Function; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.Tuple3; // To compile: javac Test.java && jar cvf Test.jar Test.class // bug: spark-submit --class Test Test.jar // no bug: spark-submit --class Test Test.jar workaround // no bug: spark-submit --conf spark.sql.adaptive.enabled=false --class Test Test.jar (3 output files in each partition key) public class Test { public static void main(String args[]) throws IOException { final var spark = SparkSession .builder() .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") .getOrCreate(); final var hadoopConf = spark.sparkContext().hadoopConfiguration(); final var fs = FileSystem.get(hadoopConf); fs.setWriteChecksum(false); // create a minimal dataset that is enough to reproduce the bug // The three columns are named _1, _2, and _3 (the field names of Tuple3) var dataset = spark.createDataset(List.of( new Tuple3(3L, "a", "r"), new Tuple3(3L, "b", "r"), new Tuple3(2L, "b", "q"), new Tuple3(2L, "a", "q"), new Tuple3(1L, "a", "p"), new Tuple3(1L, "b", "p") ), Encoders.tuple(Encoders.LONG(), Encoders.STRING(), Encoders.STRING())) .sort("_1") .select("_2", "_3"); // This is an identity mapper, i.e. returns itself // Enabled by adding an argument "workaround" when executing spark-submit. // With AQE enabled, .sort() will work as intended only if this identity mapper // is inserted between .sort() and .select() in the pipeline if (args.length > 0 && args[0].equals("workaround")) { dataset = dataset.map((MapFunction) row -> row, dataset.encoder()); } // output column _3 to text files, partitioned by column _2 // _1 is only for sorting purpose, not used in output // output will not be sorted without the identity mapper dataset.write() .mode("overwrite") .partitionBy("_2") .text("output"); dataset.explain(); spark.close(); } }) > dataset.sort.select.write.partitionBy does not return a sorted output if AQE > is enabled > --- > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 >Reporter: Yiu-Chung Lee >Priority: Major > Attachments: Test.java > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found then when AQE is enabled, the following code does not produce an > output > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > However, if I insert an identity mapper between select and write, the output > would be sorted as expected. > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.map((MapFunction) row -> row, dataset.encoder())}} > {{{}.write(){}}}{{{}.partitionBy("_2"){}}} > {{.text("output")}} > Below is the complete code that replicates the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-44512) dataset.sort.select.write.partitionBy does not return a sorted output if AQE is enabled
Yiu-Chung Lee created SPARK-44512: - Summary: dataset.sort.select.write.partitionBy does not return a sorted output if AQE is enabled Key: SPARK-44512 URL: https://issues.apache.org/jira/browse/SPARK-44512 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.1 Environment: Code that replicates the problem: import java.io.IOException; import java.util.List; import java.util.function.Function; import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import scala.Tuple3; // To compile: javac Test.java && jar cvf Test.jar Test.class // bug: spark-submit --class Test Test.jar // no bug: spark-submit --class Test Test.jar workaround // no bug: spark-submit --conf spark.sql.adaptive.enabled=false --class Test Test.jar (3 output files in each partition key) public class Test { public static void main(String args[]) throws IOException { final var spark = SparkSession .builder() .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") .getOrCreate(); final var hadoopConf = spark.sparkContext().hadoopConfiguration(); final var fs = FileSystem.get(hadoopConf); fs.setWriteChecksum(false); // create a minimal dataset that is enough to reproduce the bug // The three columns are named _1, _2, and _3 (the field names of Tuple3) var dataset = spark.createDataset(List.of( new Tuple3(3L, "a", "r"), new Tuple3(3L, "b", "r"), new Tuple3(2L, "b", "q"), new Tuple3(2L, "a", "q"), new Tuple3(1L, "a", "p"), new Tuple3(1L, "b", "p") ), Encoders.tuple(Encoders.LONG(), Encoders.STRING(), Encoders.STRING())) .sort("_1") .select("_2", "_3"); // This is an identity mapper, i.e. returns itself // Enabled by adding an argument "workaround" when executing spark-submit. // With AQE enabled, .sort() will work as intended only if this identity mapper // is inserted between .sort() and .select() in the pipeline if (args.length > 0 && args[0].equals("workaround")) { dataset = dataset.map((MapFunction) row -> row, dataset.encoder()); } // output column _3 to text files, partitioned by column _2 // _1 is only for sorting purpose, not used in output // output will not be sorted without the identity mapper dataset.write() .mode("overwrite") .partitionBy("_2") .text("output"); dataset.explain(); spark.close(); } } Reporter: Yiu-Chung Lee Attachments: Test.java (In this example the dataset is of type Tuple3, and the columns are named _1, _2 and _3) I found then when AQE is enabled, the following code does not produce an output {{dataset.sort("_1")}} {{.select("_2", "_3")}} {{.write()}} {{.partitionBy("_2")}} {{.text("output");}} However, if I insert an identity mapper between select and write, the output would be sorted as expected. {{dataset.sort("_1")}} {{.select("_2", "_3")}} {{.map((MapFunction) row -> row, dataset.encoder())}} {{{}.write(){}}}{{{}.partitionBy("_2"){}}} {{.text("output")}} Below is the complete code that replicates the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44512) dataset.sort.select.write.partitionBy does not return a sorted output if AQE is enabled
[ https://issues.apache.org/jira/browse/SPARK-44512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yiu-Chung Lee updated SPARK-44512: -- Attachment: Test.java > dataset.sort.select.write.partitionBy does not return a sorted output if AQE > is enabled > --- > > Key: SPARK-44512 > URL: https://issues.apache.org/jira/browse/SPARK-44512 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 > Environment: Code that replicates the problem: > > import java.io.IOException; > import java.util.List; > import java.util.function.Function; > import org.apache.hadoop.fs.FileSystem; > import org.apache.spark.api.java.function.MapFunction; > import org.apache.spark.sql.Encoders; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.SparkSession; > import scala.Tuple3; > // To compile: javac Test.java && jar cvf Test.jar Test.class > // bug: spark-submit --class Test Test.jar > // no bug: spark-submit --class Test Test.jar workaround > // no bug: spark-submit --conf spark.sql.adaptive.enabled=false --class Test > Test.jar (3 output files in each partition key) > public class Test { > public static void main(String args[]) throws IOException { > final var spark = SparkSession > .builder() > .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", > "false") > .getOrCreate(); > final var hadoopConf = spark.sparkContext().hadoopConfiguration(); > final var fs = FileSystem.get(hadoopConf); > fs.setWriteChecksum(false); > // create a minimal dataset that is enough to reproduce the bug > // The three columns are named _1, _2, and _3 (the field names of > Tuple3) > var dataset = spark.createDataset(List.of( > new Tuple3(3L, "a", "r"), > new Tuple3(3L, "b", "r"), > new Tuple3(2L, "b", "q"), > new Tuple3(2L, "a", "q"), > new Tuple3(1L, "a", "p"), > new Tuple3(1L, "b", "p") > ), > Encoders.tuple(Encoders.LONG(), Encoders.STRING(), > Encoders.STRING())) > .sort("_1") > .select("_2", "_3"); > // This is an identity mapper, i.e. returns itself > // Enabled by adding an argument "workaround" when executing > spark-submit. > // With AQE enabled, .sort() will work as intended only if this > identity mapper > // is inserted between .sort() and .select() in the pipeline > if (args.length > 0 && args[0].equals("workaround")) { > dataset = dataset.map((MapFunction) row -> row, > dataset.encoder()); > } > // output column _3 to text files, partitioned by column _2 > // _1 is only for sorting purpose, not used in output > // output will not be sorted without the identity mapper > dataset.write() > .mode("overwrite") > .partitionBy("_2") > .text("output"); > dataset.explain(); > spark.close(); > } > } >Reporter: Yiu-Chung Lee >Priority: Major > Attachments: Test.java > > > (In this example the dataset is of type Tuple3, and the columns are named _1, > _2 and _3) > > I found then when AQE is enabled, the following code does not produce an > output > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.write()}} > {{.partitionBy("_2")}} > {{.text("output");}} > > However, if I insert an identity mapper between select and write, the output > would be sorted as expected. > {{dataset.sort("_1")}} > {{.select("_2", "_3")}} > {{.map((MapFunction) row -> row, dataset.encoder())}} > {{{}.write(){}}}{{{}.partitionBy("_2"){}}} > {{.text("output")}} > Below is the complete code that replicates the problem. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org