[jira] [Commented] (SPARK-41454) Support Python 3.11
[ https://issues.apache.org/jira/browse/SPARK-41454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692802#comment-17692802 ] Kevin Appel commented on SPARK-41454: - [~dongjoon] thank you for working on this, looking forward to being able to run on the python 3.11 > Support Python 3.11 > --- > > Key: SPARK-41454 > URL: https://issues.apache.org/jira/browse/SPARK-41454 > Project: Spark > Issue Type: Improvement > Components: PySpark >Affects Versions: 3.4.0 >Reporter: Dongjoon Hyun >Assignee: Dongjoon Hyun >Priority: Major > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-40801) Upgrade Apache Commons Text to 1.10
[ https://issues.apache.org/jira/browse/SPARK-40801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17644430#comment-17644430 ] Kevin Appel commented on SPARK-40801: - thank you for working on this > Upgrade Apache Commons Text to 1.10 > --- > > Key: SPARK-40801 > URL: https://issues.apache.org/jira/browse/SPARK-40801 > Project: Spark > Issue Type: Dependency upgrade > Components: Build >Affects Versions: 3.4.0 >Reporter: Bjørn Jørgensen >Assignee: Bjørn Jørgensen >Priority: Minor > Fix For: 3.2.3, 3.3.2, 3.4.0 > > > [CVE-2022-42889|https://nvd.nist.gov/vuln/detail/CVE-2022-42889] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31001) Add ability to create a partitioned table via catalog.createTable()
[ https://issues.apache.org/jira/browse/SPARK-31001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598582#comment-17598582 ] Kevin Appel commented on SPARK-31001: - I am trying to get SparkR and Sparklyr to work with this and did some more testing and found out for the pyspark you don't actually need to pass it that partition columns, at least not in spark 3.2.1, doing the recover partitions is grabbing it all, there is a also a builtin for doing the recover spark.catalog.createTable("kevin.ktest2", "/user/kevin/ktest2") spark.catalog.recoverPartitions("kevin.ktest2") The SparkR has a similar items createTable("kevin.ktest3", path="/user/kevin/ktest3") recoverPartitions("kevin.ktest3") I don't see there is an easy way to do this in sparklyr Maybe this is the official way? > Add ability to create a partitioned table via catalog.createTable() > --- > > Key: SPARK-31001 > URL: https://issues.apache.org/jira/browse/SPARK-31001 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Nicholas Chammas >Priority: Minor > > There doesn't appear to be a way to create a partitioned table using the > Catalog interface. > In SQL, however, you can do this via {{{}CREATE TABLE ... PARTITIONED BY{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-31001) Add ability to create a partitioned table via catalog.createTable()
[ https://issues.apache.org/jira/browse/SPARK-31001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598397#comment-17598397 ] Kevin Appel edited comment on SPARK-31001 at 8/31/22 2:14 PM: -- Its is defined in here: [https://github.com/apache/spark/blob/55ee406df9933ca522bc98c2d2ccc0245e97ff67/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala] /** * The key to use for storing partitionBy columns as options. */ val PARTITIONING_COLUMNS_KEY = "__partition_columns" I started off doing something like: df.write.partitionBy("id").option("path", "/user/kevin/ktest1").saveAsTable("kevin.ktest1") Just to see if this works and it does, so somehow the df is having the schema with partition by it can pass to the saveAsTable and that is able to make the external table correctly. Then inside the [https://github.com/apache/spark/blob/36dd531a93af55ce5c2bfd8d275814ccb2846962/python/pyspark/sql/catalog.py#L705] it has an extra item **options : dict, optional extra options to specify in the table. I started to look for partition options and i found from the delta link: .option("__partition_columns", """["join_dim_date_id"]""") >From there I had built that into a dictionary and send that into the function >and it worked to declare the schema correct with the partition by, then the >second command goes and scans all the partitions and after that it seems to be >working. Something like this is also working: spark.catalog.createTable("kevin.ktest1", "/user/kevin/ktest1", __partition_columns="['id']") spark.sql("alter table kevin.ktest1 recover partitions") Whether or not this is the right go forward solution, hopefully some of the spark experts could chime in; this __ variable name is meant for private variables was (Author: kevinappel): Its is defined in here: https://github.com/apache/spark/blob/55ee406df9933ca522bc98c2d2ccc0245e97ff67/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala /** * The key to use for storing partitionBy columns as options. */ val PARTITIONING_COLUMNS_KEY = "__partition_columns" I started off doing something like: df.write.partitionBy("id").option("path", "/user/kevin/ktest1").saveAsTable("kevin.ktest1") Just to see if this works and it does, so somehow the df is having the schema with partition by it can pass to the saveAsTable and that is able to make the external table correctly. Then inside the [https://github.com/apache/spark/blob/36dd531a93af55ce5c2bfd8d275814ccb2846962/python/pyspark/sql/catalog.py#L705] it has an extra item **options : dict, optional extra options to specify in the table. I started to look for partition options and i found from the delta link: .option("__partition_columns", """["join_dim_date_id"]""") >From there I had built that into a dictionary and send that into the function >and it worked to declare the schema correct with the partition by, then the >second command goes and scans all the partitions and after that it seems to be >working. Something like this is also working: spark.catalog.createTable("kevin.ktest1", "/user/kevin/ktest1", __partition_columns":"['id']") spark.sql("alter table kevin.ktest1 recover partitions") Whether or not this is the right go forward solution, hopefully some of the spark experts could chime in; this __ variable name is meant for private variables > Add ability to create a partitioned table via catalog.createTable() > --- > > Key: SPARK-31001 > URL: https://issues.apache.org/jira/browse/SPARK-31001 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Nicholas Chammas >Priority: Minor > > There doesn't appear to be a way to create a partitioned table using the > Catalog interface. > In SQL, however, you can do this via {{{}CREATE TABLE ... PARTITIONED BY{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31001) Add ability to create a partitioned table via catalog.createTable()
[ https://issues.apache.org/jira/browse/SPARK-31001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17598397#comment-17598397 ] Kevin Appel commented on SPARK-31001: - Its is defined in here: https://github.com/apache/spark/blob/55ee406df9933ca522bc98c2d2ccc0245e97ff67/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala /** * The key to use for storing partitionBy columns as options. */ val PARTITIONING_COLUMNS_KEY = "__partition_columns" I started off doing something like: df.write.partitionBy("id").option("path", "/user/kevin/ktest1").saveAsTable("kevin.ktest1") Just to see if this works and it does, so somehow the df is having the schema with partition by it can pass to the saveAsTable and that is able to make the external table correctly. Then inside the [https://github.com/apache/spark/blob/36dd531a93af55ce5c2bfd8d275814ccb2846962/python/pyspark/sql/catalog.py#L705] it has an extra item **options : dict, optional extra options to specify in the table. I started to look for partition options and i found from the delta link: .option("__partition_columns", """["join_dim_date_id"]""") >From there I had built that into a dictionary and send that into the function >and it worked to declare the schema correct with the partition by, then the >second command goes and scans all the partitions and after that it seems to be >working. Something like this is also working: spark.catalog.createTable("kevin.ktest1", "/user/kevin/ktest1", __partition_columns":"['id']") spark.sql("alter table kevin.ktest1 recover partitions") Whether or not this is the right go forward solution, hopefully some of the spark experts could chime in; this __ variable name is meant for private variables > Add ability to create a partitioned table via catalog.createTable() > --- > > Key: SPARK-31001 > URL: https://issues.apache.org/jira/browse/SPARK-31001 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Nicholas Chammas >Priority: Minor > > There doesn't appear to be a way to create a partitioned table using the > Catalog interface. > In SQL, however, you can do this via {{{}CREATE TABLE ... PARTITIONED BY{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-31001) Add ability to create a partitioned table via catalog.createTable()
[ https://issues.apache.org/jira/browse/SPARK-31001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17597941#comment-17597941 ] Kevin Appel commented on SPARK-31001: - [~nchammas] I ran into this recently trying to create the external table that is partitioned and I found a bunch of items but nothing was working easily without having to manually extract the ddl and remove the partition column. I found in here [https://github.com/delta-io/delta/issues/31] there is vprus there that posted an example and they did that option and include the partition columns. I used this and tried this and combined with the second item this is working for me, to create the external table in spark using the existing parquet data. From here since this is registered then I am also able to view this in Trino spark.catalog.createTable("kevin.ktest1", "/user/kevin/ktest1", **\{"__partition_columns":"['id']"}) spark.sql("alter table kevin.ktest1 recover partitions") Can you see if this is working for you as well? If this is working for you as well then possibly we can get the Spark documentation updated to include this on using this > Add ability to create a partitioned table via catalog.createTable() > --- > > Key: SPARK-31001 > URL: https://issues.apache.org/jira/browse/SPARK-31001 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.0 >Reporter: Nicholas Chammas >Priority: Minor > > There doesn't appear to be a way to create a partitioned table using the > Catalog interface. > In SQL, however, you can do this via {{{}CREATE TABLE ... PARTITIONED BY{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37259) JDBC read is always going to wrap the query in a select statement
[ https://issues.apache.org/jira/browse/SPARK-37259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17525784#comment-17525784 ] Kevin Appel commented on SPARK-37259: - Both of these PR's were closed due to inactivity, I had tested them both and they both appeared to be working. Is it possible these could get re activated and one of them pushed to the end? Or is there someone new that could help to take this to the end and get this available in the main spark code base? I am working on Spark 3.2.1 now and I had recompiled the code to do the above items, we also have been using this code for a few months on Spark 3.1.2 and it is working for us to be able to handle the CTE queries against MSSQL > JDBC read is always going to wrap the query in a select statement > - > > Key: SPARK-37259 > URL: https://issues.apache.org/jira/browse/SPARK-37259 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2 >Reporter: Kevin Appel >Priority: Major > > The read jdbc is wrapping the query it sends to the database server inside a > select statement and there is no way to override this currently. > Initially I ran into this issue when trying to run a CTE query against SQL > server and it fails, the details of the failure is in these cases: > [https://github.com/microsoft/mssql-jdbc/issues/1340] > [https://github.com/microsoft/mssql-jdbc/issues/1657] > [https://github.com/microsoft/sql-spark-connector/issues/147] > https://issues.apache.org/jira/browse/SPARK-32825 > https://issues.apache.org/jira/browse/SPARK-34928 > I started to patch the code to get the query to run and ran into a few > different items, if there is a way to add these features to allow this code > path to run, this would be extremely helpful to running these type of edge > case queries. These are basic examples here the actual queries are much more > complex and would require significant time to rewrite. > Inside JDBCOptions.scala the query is being set to either, using the dbtable > this allows the query to be passed without modification > > {code:java} > name.trim > or > s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}" > {code} > > Inside JDBCRelation.scala this is going to try to get the schema for this > query, and this ends up running dialect.getSchemaQuery which is doing: > {code:java} > s"SELECT * FROM $table WHERE 1=0"{code} > Overriding the dialect here and initially just passing back the $table gets > passed here and to the next issue which is in the compute function in > JDBCRDD.scala > > {code:java} > val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} > $myTableSampleClause" + s" $myWhereClause $getGroupByClause $myLimitClause" > > {code} > > For these two queries, about a CTE query and using temp tables, finding out > the schema is difficult without actually running the query and for the temp > table if you run it in the schema check that will have the table now exist > and fail when it runs the actual query. > > The way I patched these is by doing these two items: > JDBCRDD.scala (compute) > > {code:java} > val runQueryAsIs = options.parameters.getOrElse("runQueryAsIs", > "false").toBoolean > val sqlText = if (runQueryAsIs) { > s"${options.tableOrQuery}" > } else { > s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause" > } > {code} > JDBCRelation.scala (getSchema) > {code:java} > val useCustomSchema = jdbcOptions.parameters.getOrElse("useCustomSchema", > "false").toBoolean > if (useCustomSchema) { > val myCustomSchema = jdbcOptions.parameters.getOrElse("customSchema", > "").toString > val newSchema = CatalystSqlParser.parseTableSchema(myCustomSchema) > logInfo(s"Going to return the new $newSchema because useCustomSchema is > $useCustomSchema and passed in $myCustomSchema") > newSchema > } else { > val tableSchema = JDBCRDD.resolveTable(jdbcOptions) > jdbcOptions.customSchema match { > case Some(customSchema) => JdbcUtils.getCustomSchema( > tableSchema, customSchema, resolver) > case None => tableSchema > } > }{code} > > This is allowing the query to run as is, by using the dbtable option and then > provide a custom schema that will bypass the dialect schema check > > Test queries > > {code:java} > query1 = """ > SELECT 1 as DummyCOL > """ > query2 = """ > WITH DummyCTE AS > ( > SELECT 1 as DummyCOL > ) > SELECT * > FROM DummyCTE > """ > query3 = """ > (SELECT * > INTO #Temp1a > FROM > (SELECT @@VERSION as version) data > ) > (SELECT * > FROM > #Temp1a) > """ > {code} > > Test schema > > {code:java} > schema1 = """ > DummyXCOL INT > """ > schema2 = """ > DummyXCOL STRING > """ > {code} > > Test code > >
[jira] [Updated] (SPARK-38571) Week of month from a date is missing in spark3 for return values of 1 to 6
[ https://issues.apache.org/jira/browse/SPARK-38571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Appel updated SPARK-38571: Description: In Spark2 we could use the date_format function with either the W or F flags to compute week of month from a date. These are computing two different items, the W is having values from 1 to 6 and the F is having values from 1 to 5 Sample code and output of expected df1 = spark.createDataFrame( [ (1, date(2014, 3, 7)), (2, date(2014, 3, 8)), (3, date(2014, 3, 30)), (4, date(2014, 3, 31)), (5, date(2015, 3, 7)), (6, date(2015, 3, 8)), (7, date(2015, 3, 30)), (8, date(2015, 3, 31)), ], schema="a long, b date", ) df1 = df1.withColumn("WEEKOFMONTH1-6", F.date_format(F.col("b"), "W")) df1 = df1.withColumn("WEEKOFMONTH1-5", F.date_format(F.col("b"), "F")) df1.show() {+}--{-}{-}{+}{-}++{-}{-}{-}-+ | a| b|WEEKOFMONTH1-6|WEEKOFMONTH1-5| {+}--{-}{-}{+}{-}++{-}{-}{-}-+ | 1|2014-03-07| 2| 1| | 2|2014-03-08| 2| 2| | 3|2014-03-30| 6| 5| | 4|2014-03-31| 6| 5| | 5|2015-03-07| 1| 1| | 6|2015-03-08| 2| 2| | 7|2015-03-30| 5| 5| | 8|2015-03-31| 5| 5| {+}--{-}{-}{+}{-}++{-}{-}{-}-+ With the Spark3 having the spark.sql.legacy.timeParserPolicy set to EXCEPTION by default this throws an error: Caused by: java.lang.IllegalArgumentException: All week-based patterns are unsupported since Spark 3.0, detected: W, Please use the SQL function EXTRACT instead However from the EXTRACT function there is nothing available that is extracting the week of month for the values 1 to 6 The Spark3 mentions they define our own patterns located at [https://spark.apache.org/docs/3.2.1/sql-ref-datetime-pattern.html] that are implemented via DateTimeFormatter under the hood: [https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html] That site is listing both W and F for week of month W week-of-month number 4 F week-of-month number 3 However only F is implemented on the datetime pattern reference Is there another way we can compute this week of month for values 1 to 6 by still using the builtin with Spark3? Currently we have to set the spark.sql.legacy.timeParserPolicy to LEGACY in order to run this Thank you, Kevin was: In Spark2 we could use the date_format function with either the W or F flags to compute week of month from a date. These are computing two different items, the W is having values from 1 to 6 and the F is having values from 1 to 5 Sample code and output of expected ``` python df1 = spark.createDataFrame( [ (1, date(2014, 3, 7)), (2, date(2014, 3, 8)), (3, date(2014, 3, 30)), (4, date(2014, 3, 31)), (5, date(2015, 3, 7)), (6, date(2015, 3, 8)), (7, date(2015, 3, 30)), (8, date(2015, 3, 31)), ], schema="a long, b date", ) df1 = df1.withColumn("WEEKOFMONTH1-6", F.date_format(F.col("b"), "W")) df1 = df1.withColumn("WEEKOFMONTH1-5", F.date_format(F.col("b"), "F")) df1.show() ``` +---+--+--+--+ | a| b|WEEKOFMONTH1-6|WEEKOFMONTH1-5| +---+--+--+--+ | 1|2014-03-07| 2| 1| | 2|2014-03-08| 2| 2| | 3|2014-03-30| 6| 5| | 4|2014-03-31| 6| 5| | 5|2015-03-07| 1| 1| | 6|2015-03-08| 2| 2| | 7|2015-03-30| 5| 5| | 8|2015-03-31| 5| 5| +---+--+--+--+ With the Spark3 having the spark.sql.legacy.timeParserPolicy set to EXCEPTION by default this throws an error: Caused by: java.lang.IllegalArgumentException: All week-based patterns are unsupported since Spark 3.0, detected: W, Please use the SQL function EXTRACT instead However from the EXTRACT function there is nothing available that is extracting the week of month for the values 1 to 6 The Spark3 mentions they define our own patterns located at [https://spark.apache.org/docs/3.2.1/sql-ref-datetime-pattern.html] that are implemented via DateTimeFormatter under the hood: [https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html] That site is listing both W and F for week of month W week-of-month number 4 F week-of-month number
[jira] [Created] (SPARK-38571) Week of month from a date is missing in spark3 for return values of 1 to 6
Kevin Appel created SPARK-38571: --- Summary: Week of month from a date is missing in spark3 for return values of 1 to 6 Key: SPARK-38571 URL: https://issues.apache.org/jira/browse/SPARK-38571 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.1.2 Reporter: Kevin Appel In Spark2 we could use the date_format function with either the W or F flags to compute week of month from a date. These are computing two different items, the W is having values from 1 to 6 and the F is having values from 1 to 5 Sample code and output of expected ``` python df1 = spark.createDataFrame( [ (1, date(2014, 3, 7)), (2, date(2014, 3, 8)), (3, date(2014, 3, 30)), (4, date(2014, 3, 31)), (5, date(2015, 3, 7)), (6, date(2015, 3, 8)), (7, date(2015, 3, 30)), (8, date(2015, 3, 31)), ], schema="a long, b date", ) df1 = df1.withColumn("WEEKOFMONTH1-6", F.date_format(F.col("b"), "W")) df1 = df1.withColumn("WEEKOFMONTH1-5", F.date_format(F.col("b"), "F")) df1.show() ``` +---+--+--+--+ | a| b|WEEKOFMONTH1-6|WEEKOFMONTH1-5| +---+--+--+--+ | 1|2014-03-07| 2| 1| | 2|2014-03-08| 2| 2| | 3|2014-03-30| 6| 5| | 4|2014-03-31| 6| 5| | 5|2015-03-07| 1| 1| | 6|2015-03-08| 2| 2| | 7|2015-03-30| 5| 5| | 8|2015-03-31| 5| 5| +---+--+--+--+ With the Spark3 having the spark.sql.legacy.timeParserPolicy set to EXCEPTION by default this throws an error: Caused by: java.lang.IllegalArgumentException: All week-based patterns are unsupported since Spark 3.0, detected: W, Please use the SQL function EXTRACT instead However from the EXTRACT function there is nothing available that is extracting the week of month for the values 1 to 6 The Spark3 mentions they define our own patterns located at [https://spark.apache.org/docs/3.2.1/sql-ref-datetime-pattern.html] that are implemented via DateTimeFormatter under the hood: [https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html] That site is listing both W and F for week of month W week-of-month number 4 F week-of-month number 3 However only F is implemented on the datetime pattern reference Is there another way we can compute this week of month for values 1 to 6 by still using the builtin with Spark3? Currently we have to set the spark.sql.legacy.timeParserPolicy to LEGACY in order to run this Thank you, Kevin -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37259) JDBC read is always going to wrap the query in a select statement
[ https://issues.apache.org/jira/browse/SPARK-37259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17450525#comment-17450525 ] Kevin Appel commented on SPARK-37259: - [~petertoth] [~akhalymon] Thank you both for working on these patches, it took me a little bit to figure out how to test them but i got the Spark 3.3.0-SNAPSHOT compiled and then added both of your changes to different working copies and then recompile the spark-sql and then was able to test both of your changes. I added comments into the github pull request links with how the testing went so far > JDBC read is always going to wrap the query in a select statement > - > > Key: SPARK-37259 > URL: https://issues.apache.org/jira/browse/SPARK-37259 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2 >Reporter: Kevin Appel >Priority: Major > > The read jdbc is wrapping the query it sends to the database server inside a > select statement and there is no way to override this currently. > Initially I ran into this issue when trying to run a CTE query against SQL > server and it fails, the details of the failure is in these cases: > [https://github.com/microsoft/mssql-jdbc/issues/1340] > [https://github.com/microsoft/mssql-jdbc/issues/1657] > [https://github.com/microsoft/sql-spark-connector/issues/147] > https://issues.apache.org/jira/browse/SPARK-32825 > https://issues.apache.org/jira/browse/SPARK-34928 > I started to patch the code to get the query to run and ran into a few > different items, if there is a way to add these features to allow this code > path to run, this would be extremely helpful to running these type of edge > case queries. These are basic examples here the actual queries are much more > complex and would require significant time to rewrite. > Inside JDBCOptions.scala the query is being set to either, using the dbtable > this allows the query to be passed without modification > > {code:java} > name.trim > or > s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}" > {code} > > Inside JDBCRelation.scala this is going to try to get the schema for this > query, and this ends up running dialect.getSchemaQuery which is doing: > {code:java} > s"SELECT * FROM $table WHERE 1=0"{code} > Overriding the dialect here and initially just passing back the $table gets > passed here and to the next issue which is in the compute function in > JDBCRDD.scala > > {code:java} > val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} > $myTableSampleClause" + s" $myWhereClause $getGroupByClause $myLimitClause" > > {code} > > For these two queries, about a CTE query and using temp tables, finding out > the schema is difficult without actually running the query and for the temp > table if you run it in the schema check that will have the table now exist > and fail when it runs the actual query. > > The way I patched these is by doing these two items: > JDBCRDD.scala (compute) > > {code:java} > val runQueryAsIs = options.parameters.getOrElse("runQueryAsIs", > "false").toBoolean > val sqlText = if (runQueryAsIs) { > s"${options.tableOrQuery}" > } else { > s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause" > } > {code} > JDBCRelation.scala (getSchema) > {code:java} > val useCustomSchema = jdbcOptions.parameters.getOrElse("useCustomSchema", > "false").toBoolean > if (useCustomSchema) { > val myCustomSchema = jdbcOptions.parameters.getOrElse("customSchema", > "").toString > val newSchema = CatalystSqlParser.parseTableSchema(myCustomSchema) > logInfo(s"Going to return the new $newSchema because useCustomSchema is > $useCustomSchema and passed in $myCustomSchema") > newSchema > } else { > val tableSchema = JDBCRDD.resolveTable(jdbcOptions) > jdbcOptions.customSchema match { > case Some(customSchema) => JdbcUtils.getCustomSchema( > tableSchema, customSchema, resolver) > case None => tableSchema > } > }{code} > > This is allowing the query to run as is, by using the dbtable option and then > provide a custom schema that will bypass the dialect schema check > > Test queries > > {code:java} > query1 = """ > SELECT 1 as DummyCOL > """ > query2 = """ > WITH DummyCTE AS > ( > SELECT 1 as DummyCOL > ) > SELECT * > FROM DummyCTE > """ > query3 = """ > (SELECT * > INTO #Temp1a > FROM > (SELECT @@VERSION as version) data > ) > (SELECT * > FROM > #Temp1a) > """ > {code} > > Test schema > > {code:java} > schema1 = """ > DummyXCOL INT > """ > schema2 = """ > DummyXCOL STRING > """ > {code} > > Test code > > {code:java} > jdbcDFWorking = ( > spark.read.format("jdbc") > .option("url", >
[jira] [Commented] (SPARK-37259) JDBC read is always going to wrap the query in a select statement
[ https://issues.apache.org/jira/browse/SPARK-37259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447578#comment-17447578 ] Kevin Appel commented on SPARK-37259: - It would be difficult to be able to actually split up the query into the parts, and to align one of the selects to match the one hard coded in the query; then the other issues issue about needing to patch into the dialect and handle how it passes that query today to get the schema and having a way to get that, without running the query twice. The other query that uses temp tables, in the sql server it is either #temptable or ##temptable is also still an issue because of how it getting wrapped in the select and the similar item if that runs the query to get the schema, then it actually creates the tables and the query fails when it runs since the table exists The other item is the query is going to do something to the query you pass in, so it would need to be based on dbtable being used that is only doing a trim; the query is wrapping: s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}" > JDBC read is always going to wrap the query in a select statement > - > > Key: SPARK-37259 > URL: https://issues.apache.org/jira/browse/SPARK-37259 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2 >Reporter: Kevin Appel >Priority: Major > > The read jdbc is wrapping the query it sends to the database server inside a > select statement and there is no way to override this currently. > Initially I ran into this issue when trying to run a CTE query against SQL > server and it fails, the details of the failure is in these cases: > [https://github.com/microsoft/mssql-jdbc/issues/1340] > [https://github.com/microsoft/mssql-jdbc/issues/1657] > [https://github.com/microsoft/sql-spark-connector/issues/147] > https://issues.apache.org/jira/browse/SPARK-32825 > https://issues.apache.org/jira/browse/SPARK-34928 > I started to patch the code to get the query to run and ran into a few > different items, if there is a way to add these features to allow this code > path to run, this would be extremely helpful to running these type of edge > case queries. These are basic examples here the actual queries are much more > complex and would require significant time to rewrite. > Inside JDBCOptions.scala the query is being set to either, using the dbtable > this allows the query to be passed without modification > > {code:java} > name.trim > or > s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}" > {code} > > Inside JDBCRelation.scala this is going to try to get the schema for this > query, and this ends up running dialect.getSchemaQuery which is doing: > {code:java} > s"SELECT * FROM $table WHERE 1=0"{code} > Overriding the dialect here and initially just passing back the $table gets > passed here and to the next issue which is in the compute function in > JDBCRDD.scala > > {code:java} > val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} > $myTableSampleClause" + s" $myWhereClause $getGroupByClause $myLimitClause" > > {code} > > For these two queries, about a CTE query and using temp tables, finding out > the schema is difficult without actually running the query and for the temp > table if you run it in the schema check that will have the table now exist > and fail when it runs the actual query. > > The way I patched these is by doing these two items: > JDBCRDD.scala (compute) > > {code:java} > val runQueryAsIs = options.parameters.getOrElse("runQueryAsIs", > "false").toBoolean > val sqlText = if (runQueryAsIs) { > s"${options.tableOrQuery}" > } else { > s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause" > } > {code} > JDBCRelation.scala (getSchema) > {code:java} > val useCustomSchema = jdbcOptions.parameters.getOrElse("useCustomSchema", > "false").toBoolean > if (useCustomSchema) { > val myCustomSchema = jdbcOptions.parameters.getOrElse("customSchema", > "").toString > val newSchema = CatalystSqlParser.parseTableSchema(myCustomSchema) > logInfo(s"Going to return the new $newSchema because useCustomSchema is > $useCustomSchema and passed in $myCustomSchema") > newSchema > } else { > val tableSchema = JDBCRDD.resolveTable(jdbcOptions) > jdbcOptions.customSchema match { > case Some(customSchema) => JdbcUtils.getCustomSchema( > tableSchema, customSchema, resolver) > case None => tableSchema > } > }{code} > > This is allowing the query to run as is, by using the dbtable option and then > provide a custom schema that will bypass the dialect schema check > > Test queries > > {code:java} > query1 = """ > SELECT 1 as DummyCOL > """ >
[jira] [Updated] (SPARK-37259) JDBC read is always going to wrap the query in a select statement
[ https://issues.apache.org/jira/browse/SPARK-37259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Appel updated SPARK-37259: Description: The read jdbc is wrapping the query it sends to the database server inside a select statement and there is no way to override this currently. Initially I ran into this issue when trying to run a CTE query against SQL server and it fails, the details of the failure is in these cases: [https://github.com/microsoft/mssql-jdbc/issues/1340] [https://github.com/microsoft/mssql-jdbc/issues/1657] [https://github.com/microsoft/sql-spark-connector/issues/147] https://issues.apache.org/jira/browse/SPARK-32825 https://issues.apache.org/jira/browse/SPARK-34928 I started to patch the code to get the query to run and ran into a few different items, if there is a way to add these features to allow this code path to run, this would be extremely helpful to running these type of edge case queries. These are basic examples here the actual queries are much more complex and would require significant time to rewrite. Inside JDBCOptions.scala the query is being set to either, using the dbtable this allows the query to be passed without modification {code:java} name.trim or s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}" {code} Inside JDBCRelation.scala this is going to try to get the schema for this query, and this ends up running dialect.getSchemaQuery which is doing: {code:java} s"SELECT * FROM $table WHERE 1=0"{code} Overriding the dialect here and initially just passing back the $table gets passed here and to the next issue which is in the compute function in JDBCRDD.scala {code:java} val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} $myTableSampleClause" + s" $myWhereClause $getGroupByClause $myLimitClause" {code} For these two queries, about a CTE query and using temp tables, finding out the schema is difficult without actually running the query and for the temp table if you run it in the schema check that will have the table now exist and fail when it runs the actual query. The way I patched these is by doing these two items: JDBCRDD.scala (compute) {code:java} val runQueryAsIs = options.parameters.getOrElse("runQueryAsIs", "false").toBoolean val sqlText = if (runQueryAsIs) { s"${options.tableOrQuery}" } else { s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause" } {code} JDBCRelation.scala (getSchema) {code:java} val useCustomSchema = jdbcOptions.parameters.getOrElse("useCustomSchema", "false").toBoolean if (useCustomSchema) { val myCustomSchema = jdbcOptions.parameters.getOrElse("customSchema", "").toString val newSchema = CatalystSqlParser.parseTableSchema(myCustomSchema) logInfo(s"Going to return the new $newSchema because useCustomSchema is $useCustomSchema and passed in $myCustomSchema") newSchema } else { val tableSchema = JDBCRDD.resolveTable(jdbcOptions) jdbcOptions.customSchema match { case Some(customSchema) => JdbcUtils.getCustomSchema( tableSchema, customSchema, resolver) case None => tableSchema } }{code} This is allowing the query to run as is, by using the dbtable option and then provide a custom schema that will bypass the dialect schema check Test queries {code:java} query1 = """ SELECT 1 as DummyCOL """ query2 = """ WITH DummyCTE AS ( SELECT 1 as DummyCOL ) SELECT * FROM DummyCTE """ query3 = """ (SELECT * INTO #Temp1a FROM (SELECT @@VERSION as version) data ) (SELECT * FROM #Temp1a) """ {code} Test schema {code:java} schema1 = """ DummyXCOL INT """ schema2 = """ DummyXCOL STRING """ {code} Test code {code:java} jdbcDFWorking = ( spark.read.format("jdbc") .option("url", f"jdbc:sqlserver://{server}:{port};databaseName={database};") .option("user", user) .option("password", password) .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") .option("dbtable", queryx) .option("customSchema", schemax) .option("useCustomSchema", "true") .option("runQueryAsIs", "true") .load() ) {code} Currently we ran into this on these two special SQL server queries however we aren't sure if there is other DB's we are using that we haven't hit this type of issue yet, without going through this I didn't realize the query is always wrapped in the SELECT no matter what you do. This is on the Spark 3.1.2 and using the PySpark with the Python 3.7.11 Thank you for your consideration and assistance to a way to fix this Kevin was: The read jdbc is wrapping the query it sends to the database server inside a select statement and there is no way to override this currently. Initially I ran into this issue when trying to run a CTE query against SQL server and it fails, the details of the failure is in these cases:
[jira] [Created] (SPARK-37259) JDBC read is always going to wrap the query in a select statement
Kevin Appel created SPARK-37259: --- Summary: JDBC read is always going to wrap the query in a select statement Key: SPARK-37259 URL: https://issues.apache.org/jira/browse/SPARK-37259 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.2 Reporter: Kevin Appel The read jdbc is wrapping the query it sends to the database server inside a select statement and there is no way to override this currently. Initially I ran into this issue when trying to run a CTE query against SQL server and it fails, the details of the failure is in these cases: [https://github.com/microsoft/mssql-jdbc/issues/1340] [https://github.com/microsoft/mssql-jdbc/issues/1657] [https://github.com/microsoft/sql-spark-connector/issues/147] https://issues.apache.org/jira/browse/SPARK-32825 https://issues.apache.org/jira/browse/SPARK-34928 I started to patch the code to get the query to run and ran into a few different items, if there is a way to add these features to allow this code path to run, this would be extremely helpful to running these type of edge case queries. These are basic examples here the actual queries are much more complex and would require significant time to rewrite. Inside JDBCOptions.scala the query is being set to either, using the dbtable this allows the query to be passed without modification {code:java} name.trim or s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}" {code} Inside JDBCRelation.scala this is going to try to get the schema for this query, and this ends up running dialect.getSchemaQuery which is doing: {code:java} s"SELECT * FROM $table WHERE 1=0"{code} Overriding the dialect here and initially just passing back the $table gets passed here and to the next issue which is in the compute function in JDBCRDD.scala {code:java} val sqlText = s"SELECT $columnList FROM ${options.tableOrQuery} $myTableSampleClause" + s" $myWhereClause $getGroupByClause $myLimitClause" {code} For these two queries, about a CTE query and using temp tables, finding out the schema is difficult without actually running the query and for the temp table if you run it in the schema check that will have the table now exist and fail when it runs the actual query. The way I patched these is by doing these two items: JDBCRDD.scala (compute) {code:java} val runQueryAsIs = options.parameters.getOrElse("runQueryAsIs", "false").toBoolean val sqlText = if (runQueryAsIs) { s"${options.tableOrQuery}" } else { s"SELECT $columnList FROM ${options.tableOrQuery} $myWhereClause" } JDBC {code} Relation.scala (getSchema) {code:java} val useCustomSchema = jdbcOptions.parameters.getOrElse("useCustomSchema", "false").toBoolean if (useCustomSchema) { val myCustomSchema = jdbcOptions.parameters.getOrElse("customSchema", "").toString val newSchema = CatalystSqlParser.parseTableSchema(myCustomSchema) logInfo(s"Going to return the new $newSchema because useCustomSchema is $useCustomSchema and passed in $myCustomSchema") newSchema } else { val tableSchema = JDBCRDD.resolveTable(jdbcOptions) jdbcOptions.customSchema match { case Some(customSchema) => JdbcUtils.getCustomSchema( tableSchema, customSchema, resolver) case None => tableSchema } }{code} This is allowing the query to run as is, by using the dbtable option and then provide a custom schema that will bypass the dialect schema check Test queries {code:java} query1 = """ SELECT 1 as DummyCOL """ query2 = """ WITH DummyCTE AS ( SELECT 1 as DummyCOL ) SELECT * FROM DummyCTE """ query3 = """ (SELECT * INTO #Temp1a FROM (SELECT @@VERSION as version) data ) (SELECT * FROM #Temp1a) """ {code} Test schema {code:java} schema1 = """ DummyXCOL INT """ schema2 = """ DummyXCOL STRING """ {code} Test code {code:java} jdbcDFWorking = ( spark.read.format("jdbc") .option("url", f"jdbc:sqlserver://{server}:{port};databaseName={database};") .option("user", user) .option("password", password) .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") .option("dbtable", queryx) .option("customSchema", schemax) .option("useCustomSchema", "true") .option("runQueryAsIs", "true") .load() ) {code} Currently we ran into this on these two special SQL server queries however we aren't sure if there is other DB's we are using that we haven't hit this type of issue yet, without going through this I didn't realize the query is always wrapped in the SELECT no matter what you do. This is on the Spark 3.1.2 and using the PySpark with the Python 3.7.11 Thank you for your consideration and assistance to a way to fix this Kevin -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (SPARK-30961) Arrow enabled: to_pandas with date column fails
[ https://issues.apache.org/jira/browse/SPARK-30961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17052464#comment-17052464 ] Kevin Appel edited comment on SPARK-30961 at 3/5/20, 10:19 PM: --- (python 3.6, pyarrow 0.8.0, pandas 0.21.0) or (python 3.7, pyarrow 0.11.1, pandas 0.24.1) are combinations I found that is still working correctly for Date in both Spark 2.3 and Spark 2.4, in additional all the examples listed on the pandas udf spark documentation also works with this setup was (Author: kevinappel): python 3.6, pyarrow 0.8.0, pandas 0.21.0 is a combination I found that is still working correctly for Date in both Spark 2.3 and Spark 2.4, in additional all the examples listed on the pandas udf spark documentation also works with this setup > Arrow enabled: to_pandas with date column fails > --- > > Key: SPARK-30961 > URL: https://issues.apache.org/jira/browse/SPARK-30961 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.5 > Environment: Apache Spark 2.4.5 >Reporter: Nicolas Renkamp >Priority: Major > Labels: ready-to-commit > > Hi, > there seems to be a bug in the arrow enabled to_pandas conversion from spark > dataframe to pandas dataframe when the dataframe has a column of type > DateType. Here is a minimal example to reproduce the issue: > {code:java} > spark = SparkSession.builder.getOrCreate() > is_arrow_enabled = spark.conf.get("spark.sql.execution.arrow.enabled") > print("Arrow optimization is enabled: " + is_arrow_enabled) > spark_df = spark.createDataFrame( > [['2019-12-06']], 'created_at: string') \ > .withColumn('created_at', F.to_date('created_at')) > # works > spark_df.toPandas() > spark.conf.set("spark.sql.execution.arrow.enabled", 'true') > is_arrow_enabled = spark.conf.get("spark.sql.execution.arrow.enabled") > print("Arrow optimization is enabled: " + is_arrow_enabled) > # raises AttributeError: Can only use .dt accessor with datetimelike values > # series is still of type object, .dt does not exist > spark_df.toPandas(){code} > A fix would be to modify the _check_series_convert_date function in > pyspark.sql.types to: > {code:java} > def _check_series_convert_date(series, data_type): > """ > Cast the series to datetime.date if it's a date type, otherwise returns > the original series.:param series: pandas.Series > :param data_type: a Spark data type for the series > """ > from pyspark.sql.utils import require_minimum_pandas_version > require_minimum_pandas_version()from pandas import to_datetime > if type(data_type) == DateType: > return to_datetime(series).dt.date > else: > return series > {code} > Let me know if I should prepare a Pull Request for the 2.4.5 branch. > I have not tested the behavior on master branch. > > Thanks, > Nicolas -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30961) Arrow enabled: to_pandas with date column fails
[ https://issues.apache.org/jira/browse/SPARK-30961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17052464#comment-17052464 ] Kevin Appel commented on SPARK-30961: - python 3.6, pyarrow 0.8.0, pandas 0.21.0 is a combination I found that is still working correctly for Date in both Spark 2.3 and Spark 2.4, in additional all the examples listed on the pandas udf spark documentation also works with this setup > Arrow enabled: to_pandas with date column fails > --- > > Key: SPARK-30961 > URL: https://issues.apache.org/jira/browse/SPARK-30961 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.5 > Environment: Apache Spark 2.4.5 >Reporter: Nicolas Renkamp >Priority: Major > Labels: ready-to-commit > > Hi, > there seems to be a bug in the arrow enabled to_pandas conversion from spark > dataframe to pandas dataframe when the dataframe has a column of type > DateType. Here is a minimal example to reproduce the issue: > {code:java} > spark = SparkSession.builder.getOrCreate() > is_arrow_enabled = spark.conf.get("spark.sql.execution.arrow.enabled") > print("Arrow optimization is enabled: " + is_arrow_enabled) > spark_df = spark.createDataFrame( > [['2019-12-06']], 'created_at: string') \ > .withColumn('created_at', F.to_date('created_at')) > # works > spark_df.toPandas() > spark.conf.set("spark.sql.execution.arrow.enabled", 'true') > is_arrow_enabled = spark.conf.get("spark.sql.execution.arrow.enabled") > print("Arrow optimization is enabled: " + is_arrow_enabled) > # raises AttributeError: Can only use .dt accessor with datetimelike values > # series is still of type object, .dt does not exist > spark_df.toPandas(){code} > A fix would be to modify the _check_series_convert_date function in > pyspark.sql.types to: > {code:java} > def _check_series_convert_date(series, data_type): > """ > Cast the series to datetime.date if it's a date type, otherwise returns > the original series.:param series: pandas.Series > :param data_type: a Spark data type for the series > """ > from pyspark.sql.utils import require_minimum_pandas_version > require_minimum_pandas_version()from pandas import to_datetime > if type(data_type) == DateType: > return to_datetime(series).dt.date > else: > return series > {code} > Let me know if I should prepare a Pull Request for the 2.4.5 branch. > I have not tested the behavior on master branch. > > Thanks, > Nicolas -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-27169) number of active tasks is negative on executors page
[ https://issues.apache.org/jira/browse/SPARK-27169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16934424#comment-16934424 ] Kevin Appel commented on SPARK-27169: - I have run into this similar issue before on jobs with over many thousand tasks, the events are getting dropped somewhere and the UI is showing gaps or anomalies in the metrics, such as the stages don't appear to be completed or the executors are showing negative metrics. Through trial and error using the following is giving reliable metrics now: --conf spark.scheduler.listenerbus.eventqueue.size=20 > number of active tasks is negative on executors page > > > Key: SPARK-27169 > URL: https://issues.apache.org/jira/browse/SPARK-27169 > Project: Spark > Issue Type: Bug > Components: Web UI >Affects Versions: 2.3.2 >Reporter: acupple >Priority: Minor > Attachments: QQ20190315-102215.png, QQ20190315-102235.png, > image-2019-03-19-15-17-25-522.png, image-2019-03-19-15-21-03-766.png, > job_1924.log, stage_3511.log > > > I use spark to process some data in HDFS and HBASE, I use one thread consume > message from a queue, and then submit to a thread pool(16 fix size)for spark > processor. > But when run for some time, the active jobs will be thousands, and number of > active tasks are negative. > Actually, these jobs are already done when I check driver logs。 > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org