[jira] [Commented] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-06-23 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368550#comment-17368550
 ] 

Nasir Ali commented on SPARK-33863:
---

[~dc-heros] Could you please share the output you got when you ran the above 
code?

> Pyspark UDF wrongly changes timestamps to UTC
> -
>
> Key: SPARK-33863
> URL: https://issues.apache.org/jira/browse/SPARK-33863
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1, 3.0.2, 3.1.0, 3.1.1, 3.1.2
> Environment: MAC/Linux
> Standalone cluster / local machine
>Reporter: Nasir Ali
>Priority: Major
>
> *Problem*:
> I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
> column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
> (timestamp) column is already in UTC time. Therefore, pyspark udf should not 
> convert ts (timestamp) column into UTC timestamp. 
> I have used following configs to let spark know the timestamps are in UTC:
>  
> {code:java}
> --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
> --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
> --conf spark.sql.session.timeZone=UTC
> {code}
> Below is a code snippet to reproduce the error:
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql.types import StringType, TimestampType
> import datetime
> spark = SparkSession.builder.config("spark.sql.session.timeZone", 
> "UTC").getOrCreate()
> df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-02-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-02-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-02-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-02-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-02-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-02-22T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.show(truncate=False)
> def some_time_udf(i):
> if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
> tmp= "Morning: " + str(i)
> elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
> tmp= "Afternoon: " + str(i)
> elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
> tmp= "Evening"
> elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
> tmp= "Night"
> elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
> tmp= "Night"
> return tmp
> udf = F.udf(some_time_udf,StringType())
> df.withColumn("day_part", udf(df.ts)).show(truncate=False)
> {code}
>  
> Below is the output of the above code:
> {code:java}
> ++-+---++
> |user|id   |ts |day_part|
> ++-+---++
> |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18|
> ++-+---++
> {code}
> Above output is incorrect. You can see ts and day_part columns don't have 
> same timestamps. Below is the output I would expect:
>  
> {code:java}
> ++-+---++
> |user|id   |ts |day_part|
> ++-+---++
> |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18|
> ++-+---++{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-06-23 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali reopened SPARK-33863:
---

Issue is not resolved. 

> Pyspark UDF wrongly changes timestamps to UTC
> -
>
> Key: SPARK-33863
> URL: https://issues.apache.org/jira/browse/SPARK-33863
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1, 3.0.2, 3.1.0, 3.1.1, 3.1.2
> Environment: MAC/Linux
> Standalone cluster / local machine
>Reporter: Nasir Ali
>Priority: Major
>
> *Problem*:
> I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
> column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
> (timestamp) column is already in UTC time. Therefore, pyspark udf should not 
> convert ts (timestamp) column into UTC timestamp. 
> I have used following configs to let spark know the timestamps are in UTC:
>  
> {code:java}
> --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
> --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
> --conf spark.sql.session.timeZone=UTC
> {code}
> Below is a code snippet to reproduce the error:
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql.types import StringType, TimestampType
> import datetime
> spark = SparkSession.builder.config("spark.sql.session.timeZone", 
> "UTC").getOrCreate()
> df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-02-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-02-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-02-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-02-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-02-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-02-22T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.show(truncate=False)
> def some_time_udf(i):
> if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
> tmp= "Morning: " + str(i)
> elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
> tmp= "Afternoon: " + str(i)
> elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
> tmp= "Evening"
> elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
> tmp= "Night"
> elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
> tmp= "Night"
> return tmp
> udf = F.udf(some_time_udf,StringType())
> df.withColumn("day_part", udf(df.ts)).show(truncate=False)
> {code}
>  
> Below is the output of the above code:
> {code:java}
> ++-+---++
> |user|id   |ts |day_part|
> ++-+---++
> |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18|
> ++-+---++
> {code}
> Above output is incorrect. You can see ts and day_part columns don't have 
> same timestamps. Below is the output I would expect:
>  
> {code:java}
> ++-+---++
> |user|id   |ts |day_part|
> ++-+---++
> |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18|
> ++-+---++{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-06-23 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-33863:
--
Affects Version/s: 3.1.2

> Pyspark UDF wrongly changes timestamps to UTC
> -
>
> Key: SPARK-33863
> URL: https://issues.apache.org/jira/browse/SPARK-33863
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1, 3.0.2, 3.1.0, 3.1.1, 3.1.2
> Environment: MAC/Linux
> Standalone cluster / local machine
>Reporter: Nasir Ali
>Priority: Major
>
> *Problem*:
> I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
> column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
> (timestamp) column is already in UTC time. Therefore, pyspark udf should not 
> convert ts (timestamp) column into UTC timestamp. 
> I have used following configs to let spark know the timestamps are in UTC:
>  
> {code:java}
> --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
> --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
> --conf spark.sql.session.timeZone=UTC
> {code}
> Below is a code snippet to reproduce the error:
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql.types import StringType, TimestampType
> import datetime
> spark = SparkSession.builder.config("spark.sql.session.timeZone", 
> "UTC").getOrCreate()
> df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-02-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-02-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-02-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-02-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-02-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-02-22T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.show(truncate=False)
> def some_time_udf(i):
> if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
> tmp= "Morning: " + str(i)
> elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
> tmp= "Afternoon: " + str(i)
> elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
> tmp= "Evening"
> elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
> tmp= "Night"
> elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
> tmp= "Night"
> return tmp
> udf = F.udf(some_time_udf,StringType())
> df.withColumn("day_part", udf(df.ts)).show(truncate=False)
> {code}
>  
> Below is the output of the above code:
> {code:java}
> ++-+---++
> |user|id   |ts |day_part|
> ++-+---++
> |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18|
> ++-+---++
> {code}
> Above output is incorrect. You can see ts and day_part columns don't have 
> same timestamps. Below is the output I would expect:
>  
> {code:java}
> ++-+---++
> |user|id   |ts |day_part|
> ++-+---++
> |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18|
> ++-+---++{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-05-07 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-33863:
--
Affects Version/s: 3.0.2
   3.1.0
   3.1.1

> Pyspark UDF wrongly changes timestamps to UTC
> -
>
> Key: SPARK-33863
> URL: https://issues.apache.org/jira/browse/SPARK-33863
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1, 3.0.2, 3.1.0, 3.1.1
> Environment: MAC/Linux
> Standalone cluster / local machine
>Reporter: Nasir Ali
>Priority: Major
>
> *Problem*:
> I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
> column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
> (timestamp) column is already in UTC time. Therefore, pyspark udf should not 
> convert ts (timestamp) column into UTC timestamp. 
> I have used following configs to let spark know the timestamps are in UTC:
>  
> {code:java}
> --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
> --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
> --conf spark.sql.session.timeZone=UTC
> {code}
> Below is a code snippet to reproduce the error:
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql.types import StringType, TimestampType
> import datetime
> spark = SparkSession.builder.config("spark.sql.session.timeZone", 
> "UTC").getOrCreate()
> df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-02-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-02-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-02-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-02-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-02-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-02-22T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.show(truncate=False)
> def some_time_udf(i):
> if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
> tmp= "Morning: " + str(i)
> elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
> tmp= "Afternoon: " + str(i)
> elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
> tmp= "Evening"
> elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
> tmp= "Night"
> elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
> tmp= "Night"
> return tmp
> udf = F.udf(some_time_udf,StringType())
> df.withColumn("day_part", udf(df.ts)).show(truncate=False)
> {code}
>  
> Below is the output of the above code:
> {code:java}
> ++-+---++
> |user|id   |ts |day_part|
> ++-+---++
> |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18|
> ++-+---++
> {code}
> Above output is incorrect. You can see ts and day_part columns don't have 
> same timestamps. Below is the output I would expect:
>  
> {code:java}
> ++-+---++
> |user|id   |ts |day_part|
> ++-+---++
> |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18|
> ++-+---++{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-05-07 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17341075#comment-17341075
 ] 

Nasir Ali commented on SPARK-33863:
---

[~hyukjin.kwon] and [~viirya] This bug exist in all the 3.x.x versions of 
Pyspark. Any update or suggestion?

> Pyspark UDF wrongly changes timestamps to UTC
> -
>
> Key: SPARK-33863
> URL: https://issues.apache.org/jira/browse/SPARK-33863
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1, 3.0.2, 3.1.0, 3.1.1
> Environment: MAC/Linux
> Standalone cluster / local machine
>Reporter: Nasir Ali
>Priority: Major
>
> *Problem*:
> I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
> column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
> (timestamp) column is already in UTC time. Therefore, pyspark udf should not 
> convert ts (timestamp) column into UTC timestamp. 
> I have used following configs to let spark know the timestamps are in UTC:
>  
> {code:java}
> --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
> --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
> --conf spark.sql.session.timeZone=UTC
> {code}
> Below is a code snippet to reproduce the error:
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql.types import StringType, TimestampType
> import datetime
> spark = SparkSession.builder.config("spark.sql.session.timeZone", 
> "UTC").getOrCreate()
> df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-02-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-02-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-02-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-02-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-02-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-02-22T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.show(truncate=False)
> def some_time_udf(i):
> if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
> tmp= "Morning: " + str(i)
> elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
> tmp= "Afternoon: " + str(i)
> elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
> tmp= "Evening"
> elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
> tmp= "Night"
> elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
> tmp= "Night"
> return tmp
> udf = F.udf(some_time_udf,StringType())
> df.withColumn("day_part", udf(df.ts)).show(truncate=False)
> {code}
>  
> Below is the output of the above code:
> {code:java}
> ++-+---++
> |user|id   |ts |day_part|
> ++-+---++
> |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18|
> ++-+---++
> {code}
> Above output is incorrect. You can see ts and day_part columns don't have 
> same timestamps. Below is the output I would expect:
>  
> {code:java}
> ++-+---++
> |user|id   |ts |day_part|
> ++-+---++
> |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18|
> ++-+---++{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-02-22 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17288802#comment-17288802
 ] 

Nasir Ali commented on SPARK-33863:
---

[~hyukjin.kwon] and [~viirya] any update on this issue?

> Pyspark UDF wrongly changes timestamps to UTC
> -
>
> Key: SPARK-33863
> URL: https://issues.apache.org/jira/browse/SPARK-33863
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1
> Environment: MAC/Linux
> Standalone cluster / local machine
>Reporter: Nasir Ali
>Priority: Major
>
> *Problem*:
> I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
> column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
> (timestamp) column is already in UTC time. Therefore, pyspark udf should not 
> convert ts (timestamp) column into UTC timestamp. 
> I have used following configs to let spark know the timestamps are in UTC:
>  
> {code:java}
> --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
> --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
> --conf spark.sql.session.timeZone=UTC
> {code}
> Below is a code snippet to reproduce the error:
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql.types import StringType, TimestampType
> import datetime
> spark = SparkSession.builder.config("spark.sql.session.timeZone", 
> "UTC").getOrCreate()
> df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-02-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-02-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-02-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-02-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-02-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-02-22T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.show(truncate=False)
> def some_time_udf(i):
> if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
> tmp= "Morning: " + str(i)
> elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
> tmp= "Afternoon: " + str(i)
> elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
> tmp= "Evening"
> elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
> tmp= "Night"
> elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
> tmp= "Night"
> return tmp
> udf = F.udf(some_time_udf,StringType())
> df.withColumn("day_part", udf(df.ts)).show(truncate=False)
> {code}
>  
> Below is the output of the above code:
> {code:java}
> ++-+---++
> |user|id   |ts |day_part|
> ++-+---++
> |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18|
> ++-+---++
> {code}
> Above output is incorrect. You can see ts and day_part columns don't have 
> same timestamps. Below is the output I would expect:
>  
> {code:java}
> ++-+---++
> |user|id   |ts |day_part|
> ++-+---++
> |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18|
> |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18|
> ++-+---++{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-01-07 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-33863:
--
Description: 
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp. 

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType
import datetime

spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
("usr1",13.00, "2018-02-11T12:27:18+00:00"),
("usr1",25.00, "2018-02-12T11:27:18+00:00"),
("usr1",20.00, "2018-02-13T15:27:18+00:00"),
("usr1",17.00, "2018-02-14T12:27:18+00:00"),
("usr2",99.00, "2018-02-15T11:27:18+00:00"),
("usr2",156.00, "2018-02-22T11:27:18+00:00")
],
   ["user","id", "ts"])

df = df.withColumn('ts', df.ts.cast('timestamp'))
df.show(truncate=False)

def some_time_udf(i):
if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
tmp= "Morning: " + str(i)
elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
tmp= "Afternoon: " + str(i)
elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
tmp= "Evening"
elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
tmp= "Night"
elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
tmp= "Night"
return tmp

udf = F.udf(some_time_udf,StringType())
df.withColumn("day_part", udf(df.ts)).show(truncate=False)


{code}
 

Below is the output of the above code:
{code:java}
++-+---++
|user|id   |ts |day_part|
++-+---++
|usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18|
|usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18|
|usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18|
|usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18|
|usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18|
|usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18|
|usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18|
++-+---++
{code}
Above output is incorrect. You can see ts and day_part columns don't have same 
timestamps. Below is the output I would expect:

 
{code:java}
++-+---++
|user|id   |ts |day_part|
++-+---++
|usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18|
|usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18|
|usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18|
|usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18|
|usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18|
|usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18|
|usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18|
++-+---++{code}
 

  was:
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp. 

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType
import datetime

spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
("usr1",13.00, "2018-02-11T12:27:18+00:00"),
("usr1",25.00, "2018-02-12T11:27:18+00:00"),

[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-01-07 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-33863:
--
Description: 
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp. 

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType
import datetime

spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
("usr1",13.00, "2018-02-11T12:27:18+00:00"),
("usr1",25.00, "2018-02-12T11:27:18+00:00"),
("usr1",20.00, "2018-02-13T15:27:18+00:00"),
("usr1",17.00, "2018-02-14T12:27:18+00:00"),
("usr2",99.00, "2018-02-15T11:27:18+00:00"),
("usr2",156.00, "2018-02-22T11:27:18+00:00")
],
   ["user","id", "ts"])

df = df.withColumn('ts', df.ts.cast('timestamp'))
df.show(truncate=False)

def some_time_udf(i):
if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
tmp= "Morning: " + str(i)
elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
tmp= str(i)
elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
tmp= "Evening"
elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
tmp= "Night"
elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
tmp= "Night"
return tmp

udf = F.udf(some_time_udf,StringType())
df.withColumn("day_part", udf(df.ts)).show(truncate=False)


{code}
 

Below is the output of the above code:
{code:java}
++-+---++
|user|id   |ts |day_part|
++-+---++
|usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18|
|usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18|
|usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18|
|usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18|
|usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18|
|usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18|
|usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18|
++-+---++
{code}
Above output is incorrect. You can see ts and day_part columns don't have same 
timestamps. Below is the output I would expect:

 
{code:java}
++-+---++
|user|id   |ts |day_part|
++-+---++
|usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18|
|usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18|
|usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18|
|usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18|
|usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18|
|usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18|
|usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18|
++-+---++{code}
 

  was:
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp. 

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType
import datetime

spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
("usr1",13.00, "2018-02-11T12:27:18+00:00"),
("usr1",25.00, "2018-02-12T11:27:18+00:00"),

[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-01-07 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-33863:
--
Description: 
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp. 

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType
import datetime

spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
("usr1",13.00, "2018-02-11T12:27:18+00:00"),
("usr1",25.00, "2018-02-12T11:27:18+00:00"),
("usr1",20.00, "2018-02-13T15:27:18+00:00"),
("usr1",17.00, "2018-02-14T12:27:18+00:00"),
("usr2",99.00, "2018-02-15T11:27:18+00:00"),
("usr2",156.00, "2018-02-22T11:27:18+00:00")
],
   ["user","id", "ts"])

df = df.withColumn('ts', df.ts.cast('timestamp'))
df.show(truncate=False)

def some_time_udf(i):
if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
tmp= "Morning: " + str(i)
elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
tmp= str(i)
elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
tmp= "Evening"
elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
tmp= "Night"
elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
tmp= "Night"
return tmp

udf = F.udf(some_time_udf,StringType())
df.withColumn("day_part", udf(df.ts)).show(truncate=False)


{code}
 

Below is the output of the above code:
{code:java}
++-+---+---+
|user|id   |ts |day_part   |
++-+---+---+
|usr1|17.0 |2018-02-10 15:27:18|2018-02-10 09:27:18|
|usr1|13.0 |2018-02-11 12:27:18|2018-02-11 06:27:18|
|usr1|25.0 |2018-02-12 11:27:18|2018-02-12 05:27:18|
|usr1|20.0 |2018-02-13 15:27:18|2018-02-13 09:27:18|
|usr1|17.0 |2018-02-14 12:27:18|2018-02-14 06:27:18|
|usr2|99.0 |2018-02-15 11:27:18|2018-02-15 05:27:18|
|usr2|156.0|2018-02-22 11:27:18|2018-02-22 05:27:18|
++-+---+---+
{code}
Above output is incorrect. You can see ts and day_part columns don't have same 
timestamps. Below is the output I would expect:

 
{code:java}
++-+---+---+
|user|id   |ts |day_part   |
++-+---+---+
|usr1|17.0 |2018-02-10 15:27:18|2018-02-10 15:27:18|
|usr1|13.0 |2018-02-11 12:27:18|2018-02-11 12:27:18|
|usr1|25.0 |2018-02-12 11:27:18|2018-02-12 11:27:18|
|usr1|20.0 |2018-02-13 15:27:18|2018-02-13 15:27:18|
|usr1|17.0 |2018-02-14 12:27:18|2018-02-14 12:27:18|
|usr2|99.0 |2018-02-15 11:27:18|2018-02-15 11:27:18|
|usr2|156.0|2018-02-22 11:27:18|2018-02-22 11:27:18|
++-+---+---+{code}
If I change return type to TimeStampType then 'day_part' will have correct 
timestamp.

  was:
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp. 

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import datetime

spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
("usr1",13.00, "2018-02-11T12:27:18+00:00"),
("usr1",25.00, "2018-02-12T11:27:18+00:00"),
("usr1",20.00, "2018-02-13T15:27:18+00:00"),
("usr1",17.00, "2018-02-14T12:27:18+00:00"),
  

[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-01-07 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-33863:
--
Description: 
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp. 

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import datetime

spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
("usr1",13.00, "2018-02-11T12:27:18+00:00"),
("usr1",25.00, "2018-02-12T11:27:18+00:00"),
("usr1",20.00, "2018-02-13T15:27:18+00:00"),
("usr1",17.00, "2018-02-14T12:27:18+00:00"),
("usr2",99.00, "2018-02-15T11:27:18+00:00"),
("usr2",156.00, "2018-02-22T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))

df.show(truncate=False)

def some_time_udf(i):
return str(i)

udf = F.udf(some_time_udf,StringType())
df.withColumn("day_part", udf(df.ts)).show(truncate=False)

{code}
 

Below is the output of the above code:
{code:java}
++-+---+---+
|user|id   |ts |day_part   |
++-+---+---+
|usr1|17.0 |2018-02-10 15:27:18|2018-02-10 09:27:18|
|usr1|13.0 |2018-02-11 12:27:18|2018-02-11 06:27:18|
|usr1|25.0 |2018-02-12 11:27:18|2018-02-12 05:27:18|
|usr1|20.0 |2018-02-13 15:27:18|2018-02-13 09:27:18|
|usr1|17.0 |2018-02-14 12:27:18|2018-02-14 06:27:18|
|usr2|99.0 |2018-02-15 11:27:18|2018-02-15 05:27:18|
|usr2|156.0|2018-02-22 11:27:18|2018-02-22 05:27:18|
++-+---+---+
{code}
Above output is incorrect. You can see ts and day_part columns don't have same 
timestamps. Below is the output I would expect:

 
{code:java}
++-+---+---+
|user|id   |ts |day_part   |
++-+---+---+
|usr1|17.0 |2018-02-10 15:27:18|2018-02-10 15:27:18|
|usr1|13.0 |2018-02-11 12:27:18|2018-02-11 12:27:18|
|usr1|25.0 |2018-02-12 11:27:18|2018-02-12 11:27:18|
|usr1|20.0 |2018-02-13 15:27:18|2018-02-13 15:27:18|
|usr1|17.0 |2018-02-14 12:27:18|2018-02-14 12:27:18|
|usr2|99.0 |2018-02-15 11:27:18|2018-02-15 11:27:18|
|usr2|156.0|2018-02-22 11:27:18|2018-02-22 11:27:18|
++-+---+---+{code}
If I change return type to TimeStampType then 'day_part' will have correct 
timestamp.

  was:
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp.

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import datetime

spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
("usr1",13.00, "2018-02-11T12:27:18+00:00"),
("usr1",25.00, "2018-02-12T11:27:18+00:00"),
("usr1",20.00, "2018-02-13T15:27:18+00:00"),
("usr1",17.00, "2018-02-14T12:27:18+00:00"),
("usr2",99.00, "2018-02-15T11:27:18+00:00"),
("usr2",156.00, "2018-02-22T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))

df.show(truncate=False)

def some_time_udf(i):
return str(i)

udf = F.udf(some_time_udf,StringType())
df.withColumn("day_part", udf(df.ts)).show(truncate=False)

{code}
 

Below is the output of 

[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-01-07 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-33863:
--
Description: 
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp.

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import datetime

spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"),
("usr1",13.00, "2018-02-11T12:27:18+00:00"),
("usr1",25.00, "2018-02-12T11:27:18+00:00"),
("usr1",20.00, "2018-02-13T15:27:18+00:00"),
("usr1",17.00, "2018-02-14T12:27:18+00:00"),
("usr2",99.00, "2018-02-15T11:27:18+00:00"),
("usr2",156.00, "2018-02-22T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))

df.show(truncate=False)

def some_time_udf(i):
return str(i)

udf = F.udf(some_time_udf,StringType())
df.withColumn("day_part", udf(df.ts)).show(truncate=False)

{code}
 

Below is the output of the above code:
{code:java}
++-+---+---+
|user|id   |ts |day_part   |
++-+---+---+
|usr1|17.0 |2018-02-10 15:27:18|2018-02-10 09:27:18|
|usr1|13.0 |2018-02-11 12:27:18|2018-02-11 06:27:18|
|usr1|25.0 |2018-02-12 11:27:18|2018-02-12 05:27:18|
|usr1|20.0 |2018-02-13 15:27:18|2018-02-13 09:27:18|
|usr1|17.0 |2018-02-14 12:27:18|2018-02-14 06:27:18|
|usr2|99.0 |2018-02-15 11:27:18|2018-02-15 05:27:18|
|usr2|156.0|2018-02-22 11:27:18|2018-02-22 05:27:18|
++-+---+---+
{code}
Above output is incorrect. You can see ts and day_part columns don't have same 
timestamps. Below is the output I would expect:
{code:java}
++-+---+---+
|user|id   |ts |day_part   |
++-+---+---+
|usr1|17.0 |2018-02-10 15:27:18|2018-02-10 15:27:18|
|usr1|13.0 |2018-02-11 12:27:18|2018-02-11 12:27:18|
|usr1|25.0 |2018-02-12 11:27:18|2018-02-12 11:27:18|
|usr1|20.0 |2018-02-13 15:27:18|2018-02-13 15:27:18|
|usr1|17.0 |2018-02-14 12:27:18|2018-02-14 12:27:18|
|usr2|99.0 |2018-02-15 11:27:18|2018-02-15 11:27:18|
|usr2|156.0|2018-02-22 11:27:18|2018-02-22 11:27:18|
++-+---+---+
{code}

  was:
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp.

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import datetime
spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()df = spark.createDataFrame([("usr1",17.00, 
"2018-02-10T15:27:18+00:00"),
("usr1",13.00, "2018-02-11T12:27:18+00:00"),
("usr1",25.00, "2018-02-12T11:27:18+00:00"),
("usr1",20.00, "2018-02-13T15:27:18+00:00"),
("usr1",17.00, "2018-02-14T12:27:18+00:00"),
("usr2",99.00, "2018-02-15T11:27:18+00:00"),
("usr2",156.00, "2018-02-22T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.show(truncate=False)def some_time_udf(i):
return str(i)udf = F.udf(some_time_udf,StringType())
df.withColumn("day_part", udf(df.ts)).show(truncate=False)

{code}
 

Below is the output of the above code:
{code:java}
++-+---+---+
|user|id   |ts 

[jira] [Commented] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-01-07 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17260911#comment-17260911
 ] 

Nasir Ali commented on SPARK-33863:
---

[~hyukjin.kwon] and [~viirya] I have simplified example code, added/revised 
details, and also added output and expected output. Please let me know if you 
need more information.

> Pyspark UDF wrongly changes timestamps to UTC
> -
>
> Key: SPARK-33863
> URL: https://issues.apache.org/jira/browse/SPARK-33863
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1
> Environment: MAC/Linux
> Standalone cluster / local machine
>Reporter: Nasir Ali
>Priority: Major
>
> *Problem*:
> I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
> column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
> (timestamp) column is already in UTC time. Therefore, pyspark udf should not 
> convert ts (timestamp) column into UTC timestamp.
> I have used following configs to let spark know the timestamps are in UTC:
>  
> {code:java}
> --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
> --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
> --conf spark.sql.session.timeZone=UTC
> {code}
> Below is a code snippet to reproduce the error:
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql import functions as F
> from pyspark.sql.types import StringType
> import datetime
> spark = SparkSession.builder.config("spark.sql.session.timeZone", 
> "UTC").getOrCreate()df = spark.createDataFrame([("usr1",17.00, 
> "2018-02-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-02-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-02-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-02-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-02-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-02-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-02-22T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.show(truncate=False)def some_time_udf(i):
> return str(i)udf = F.udf(some_time_udf,StringType())
> df.withColumn("day_part", udf(df.ts)).show(truncate=False)
> {code}
>  
> Below is the output of the above code:
> {code:java}
> ++-+---+---+
> |user|id   |ts |day_part   |
> ++-+---+---+
> |usr1|17.0 |2018-02-10 15:27:18|2018-02-10 09:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|2018-02-11 06:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|2018-02-12 05:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|2018-02-13 09:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|2018-02-14 06:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|2018-02-15 05:27:18|
> |usr2|156.0|2018-02-22 11:27:18|2018-02-22 05:27:18|
> ++-+---+---+
> {code}
> Above output is incorrect. You can see ts and day_part columns don't have 
> same timestamps. Below is the output I would expect:
> {code:java}
> ++-+---+---+
> |user|id   |ts |day_part   |
> ++-+---+---+
> |usr1|17.0 |2018-02-10 15:27:18|2018-02-10 15:27:18|
> |usr1|13.0 |2018-02-11 12:27:18|2018-02-11 12:27:18|
> |usr1|25.0 |2018-02-12 11:27:18|2018-02-12 11:27:18|
> |usr1|20.0 |2018-02-13 15:27:18|2018-02-13 15:27:18|
> |usr1|17.0 |2018-02-14 12:27:18|2018-02-14 12:27:18|
> |usr2|99.0 |2018-02-15 11:27:18|2018-02-15 11:27:18|
> |usr2|156.0|2018-02-22 11:27:18|2018-02-22 11:27:18|
> ++-+---+---+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC

2021-01-07 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-33863:
--
Description: 
*Problem*:

I have a dataframe with a ts (timestamp) column in UTC. If I create a new 
column using udf, pyspark udf wrongly changes timestamps into UTC time. ts 
(timestamp) column is already in UTC time. Therefore, pyspark udf should not 
convert ts (timestamp) column into UTC timestamp.

I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import datetime
spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()df = spark.createDataFrame([("usr1",17.00, 
"2018-02-10T15:27:18+00:00"),
("usr1",13.00, "2018-02-11T12:27:18+00:00"),
("usr1",25.00, "2018-02-12T11:27:18+00:00"),
("usr1",20.00, "2018-02-13T15:27:18+00:00"),
("usr1",17.00, "2018-02-14T12:27:18+00:00"),
("usr2",99.00, "2018-02-15T11:27:18+00:00"),
("usr2",156.00, "2018-02-22T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.show(truncate=False)def some_time_udf(i):
return str(i)udf = F.udf(some_time_udf,StringType())
df.withColumn("day_part", udf(df.ts)).show(truncate=False)

{code}
 

Below is the output of the above code:
{code:java}
++-+---+---+
|user|id   |ts |day_part   |
++-+---+---+
|usr1|17.0 |2018-02-10 15:27:18|2018-02-10 09:27:18|
|usr1|13.0 |2018-02-11 12:27:18|2018-02-11 06:27:18|
|usr1|25.0 |2018-02-12 11:27:18|2018-02-12 05:27:18|
|usr1|20.0 |2018-02-13 15:27:18|2018-02-13 09:27:18|
|usr1|17.0 |2018-02-14 12:27:18|2018-02-14 06:27:18|
|usr2|99.0 |2018-02-15 11:27:18|2018-02-15 05:27:18|
|usr2|156.0|2018-02-22 11:27:18|2018-02-22 05:27:18|
++-+---+---+
{code}
Above output is incorrect. You can see ts and day_part columns don't have same 
timestamps. Below is the output I would expect:


{code:java}
++-+---+---+
|user|id   |ts |day_part   |
++-+---+---+
|usr1|17.0 |2018-02-10 15:27:18|2018-02-10 15:27:18|
|usr1|13.0 |2018-02-11 12:27:18|2018-02-11 12:27:18|
|usr1|25.0 |2018-02-12 11:27:18|2018-02-12 11:27:18|
|usr1|20.0 |2018-02-13 15:27:18|2018-02-13 15:27:18|
|usr1|17.0 |2018-02-14 12:27:18|2018-02-14 12:27:18|
|usr2|99.0 |2018-02-15 11:27:18|2018-02-15 11:27:18|
|usr2|156.0|2018-02-22 11:27:18|2018-02-22 11:27:18|
++-+---+---+
{code}

  was:
*Problem*:

If I create a new column using udf, pyspark udf changes timestamps into UTC 
time. I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import datetime
spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()df = spark.createDataFrame([("usr1",17.00, 
"2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.show(truncate=False)def some_time_udf(i):
tmp=""
if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
tmp="Morning -> "+str(i)
return tmpudf = F.udf(some_time_udf,StringType())

df.withColumn("day_part", udf(df.ts)).show(truncate=False)


{code}
I 

[jira] [Updated] (SPARK-33863) Pyspark UDF changes timestamps to UTC

2021-01-07 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-33863:
--
Description: 
*Problem*:

If I create a new column using udf, pyspark udf changes timestamps into UTC 
time. I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import datetime
spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()df = spark.createDataFrame([("usr1",17.00, 
"2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.show(truncate=False)def some_time_udf(i):
tmp=""
if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
tmp="Morning -> "+str(i)
return tmpudf = F.udf(some_time_udf,StringType())

df.withColumn("day_part", udf(df.ts)).show(truncate=False)


{code}
I have concatenated timestamps with the string to show that pyspark pass 
timestamps as UTC.

  was:
*Problem*:

If I create a new column using udf, pyspark udf changes timestamps into UTC 
time. I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import datetime
spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))

df.show(truncate=False)

def some_time_udf(i):
tmp=""
if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
tmp="Morning -> "+str(i)
elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
tmp= "Afternoon -> "+str(i)
elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
tmp= "Evening -> "+str(i)
elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
tmp= "Night -> "+str(i)
elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
tmp= "Night -> "+str(i)
return tmpsometimeudf = 
F.udf(some_time_udf,StringType())df.withColumn("day_part", 
sometimeudf("ts")).show(truncate=False)

{code}
I have concatenated timestamps with the string to show that pyspark pass 
timestamps as UTC.


> Pyspark UDF changes timestamps to UTC
> -
>
> Key: SPARK-33863
> URL: https://issues.apache.org/jira/browse/SPARK-33863
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 3.0.1
> Environment: MAC/Linux
> Standalone cluster / local machine
>Reporter: Nasir Ali
>Priority: Major
>
> *Problem*:
> If I create a new column using udf, pyspark udf changes timestamps into UTC 
> time. I have used following configs to let spark know the timestamps are in 
> UTC:
>  
> {code:java}
> --conf 

[jira] [Created] (SPARK-33863) Pyspark UDF changes timestamps to UTC

2020-12-20 Thread Nasir Ali (Jira)
Nasir Ali created SPARK-33863:
-

 Summary: Pyspark UDF changes timestamps to UTC
 Key: SPARK-33863
 URL: https://issues.apache.org/jira/browse/SPARK-33863
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 3.0.1
 Environment: MAC/Linux

Standalone cluster / local machine
Reporter: Nasir Ali


*Problem*:

If I create a new column using udf, pyspark udf changes timestamps into UTC 
time. I have used following configs to let spark know the timestamps are in UTC:

 
{code:java}
--conf spark.driver.extraJavaOptions=-Duser.timezone=UTC 
--conf spark.executor.extraJavaOptions=-Duser.timezone=UTC
--conf spark.sql.session.timeZone=UTC
{code}
Below is a code snippet to reproduce the error:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import datetime
spark = SparkSession.builder.config("spark.sql.session.timeZone", 
"UTC").getOrCreate()

df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))

df.show(truncate=False)

def some_time_udf(i):
tmp=""
if  datetime.time(5, 0)<=i.time() < datetime.time(12, 0):
tmp="Morning -> "+str(i)
elif  datetime.time(12, 0)<=i.time() < datetime.time(17, 0):
tmp= "Afternoon -> "+str(i)
elif  datetime.time(17, 0)<=i.time() < datetime.time(21, 0):
tmp= "Evening -> "+str(i)
elif  datetime.time(21, 0)<=i.time() < datetime.time(0, 0):
tmp= "Night -> "+str(i)
elif  datetime.time(0, 0)<=i.time() < datetime.time(5, 0):
tmp= "Night -> "+str(i)
return tmpsometimeudf = 
F.udf(some_time_udf,StringType())df.withColumn("day_part", 
sometimeudf("ts")).show(truncate=False)

{code}
I have concatenated timestamps with the string to show that pyspark pass 
timestamps as UTC.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30162) Add PushedFilters to metadata in Parquet DSv2 implementation

2020-04-21 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089034#comment-17089034
 ] 

Nasir Ali commented on SPARK-30162:
---

[~dongjoon] This bug has not been fixed. Please have a look at my previous 
comment and uploaded screenshots. Yes you added logs to debug but it doesn't 
perform filtering on partition key yet.

> Add PushedFilters to metadata in Parquet DSv2 implementation
> 
>
> Key: SPARK-30162
> URL: https://issues.apache.org/jira/browse/SPARK-30162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
> Environment: pyspark 3.0 preview
> Ubuntu/Centos
> pyarrow 0.14.1 
>Reporter: Nasir Ali
>Assignee: Hyukjin Kwon
>Priority: Minor
> Attachments: Screenshot from 2020-01-01 21-01-18.png, Screenshot from 
> 2020-01-01 21-01-32.png
>
>
> Filters are not pushed down in Spark 3.0 preview. Also the output of 
> "explain" method is different. It is hard to debug in 3.0 whether filters 
> were pushed down or not. Below code could reproduce the bug:
>  
> {code:java}
> // code placeholder
> df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
> ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-16T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
> spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
> {code}
> {code:java}
> // Spark 2.4 output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#40 = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
> Filter (isnotnull(user#40) && (user#40 = usr2))
> +- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
> *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
> PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
> ReadSchema: struct{code}
> {code:java}
> // Spark 3.0.0-preview output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
> Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#2 = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
> Logical Plan ==
> Filter (isnotnull(user#2) AND (user#2 = usr2))
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical 
> Plan ==
> *(1) Project [id#0, ts#1, user#2]
> +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
>+- *(1) ColumnarToRow
>   +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
> InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
> struct
> {code}
> I have tested it on much larger dataset. Spark 3.0 tries to load whole data 
> and then apply filter. Whereas Spark 2.4 push down the filter. Above output 
> shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview.
>  
> Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
> it's hard to debug.  spark.sql.orc.cache.stripe.details.size=1 doesn't 
> work.
>  
> {code:java}
> // pyspark 3 shell output
> $ pyspark
> Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
> [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
> Type "help", "copyright", "credits" or "license" for more information.
> Warning: Ignoring non-spark config property: 
> java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8
> 19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 19/12/09 07:05:36 WARN SparkConf: Note that 

[jira] [Comment Edited] (SPARK-30162) Add PushedFilters to metadata in Parquet DSv2 implementation

2020-01-01 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006548#comment-17006548
 ] 

Nasir Ali edited comment on SPARK-30162 at 1/2/20 3:05 AM:
---

This issue has not been fixed. It shows pushed filters but it does not actually 
push filters. For example, look at 2.4.* output; there it clearly shows 'user' 
as partition filter but not in 3.* version.  Have a look at spark ui sql graph.

 

On a bigger dataset, spark 2.4.* takes less than a second to show one row. 
Whereas, spark 3.* takes way too long to produce the same result. Code is 
exactly same (load parquet, filter based on partition key user)

 


was (Author: nasirali):
This issue has not been fixed. It shows pushed filters but it does not actually 
push filters. For example, look at 2.4.* output; there it clearly shows 'user' 
as partition filter but not in 3.* version.  Have a look at spark ui sql graph.

 

On a bigger dataset, spark 2.4.* takes less than a second to show one row. 
Whereas, spark 3.* takes 10+ seconds to produce the same result. Code is 
exactly same (load parquet, filter based on partition key user)

 

> Add PushedFilters to metadata in Parquet DSv2 implementation
> 
>
> Key: SPARK-30162
> URL: https://issues.apache.org/jira/browse/SPARK-30162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
> Environment: pyspark 3.0 preview
> Ubuntu/Centos
> pyarrow 0.14.1 
>Reporter: Nasir Ali
>Assignee: Hyukjin Kwon
>Priority: Minor
> Fix For: 3.0.0
>
> Attachments: Screenshot from 2020-01-01 21-01-18.png, Screenshot from 
> 2020-01-01 21-01-32.png
>
>
> Filters are not pushed down in Spark 3.0 preview. Also the output of 
> "explain" method is different. It is hard to debug in 3.0 whether filters 
> were pushed down or not. Below code could reproduce the bug:
>  
> {code:java}
> // code placeholder
> df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
> ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-16T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
> spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
> {code}
> {code:java}
> // Spark 2.4 output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#40 = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
> Filter (isnotnull(user#40) && (user#40 = usr2))
> +- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
> *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
> PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
> ReadSchema: struct{code}
> {code:java}
> // Spark 3.0.0-preview output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
> Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#2 = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
> Logical Plan ==
> Filter (isnotnull(user#2) AND (user#2 = usr2))
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical 
> Plan ==
> *(1) Project [id#0, ts#1, user#2]
> +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
>+- *(1) ColumnarToRow
>   +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
> InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
> struct
> {code}
> I have tested it on much larger dataset. Spark 3.0 tries to load whole data 
> and then apply filter. Whereas Spark 2.4 push down the filter. Above output 
> shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview.
>  
> Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
> it's hard to debug.  

[jira] [Comment Edited] (SPARK-30162) Add PushedFilters to metadata in Parquet DSv2 implementation

2020-01-01 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006548#comment-17006548
 ] 

Nasir Ali edited comment on SPARK-30162 at 1/2/20 3:04 AM:
---

This issue has not been fixed. It shows pushed filters but it does not actually 
push filters. For example, look at 2.4.* output; there it clearly shows 'user' 
as partition filter but not in 3.* version.  Have a look at spark ui sql graph.

 

On a bigger dataset, spark 2.4.* takes less than a second to show one row. 
Whereas, spark 3.* takes 10+ seconds to produce the same result. Code is 
exactly same (load parquet, filter based on partition key user)

 


was (Author: nasirali):
This issue has not been fixed. It shows pushed filters but it does not actually 
push filters. For example, look at 2.4.* output; there it clearly shows 'user' 
as partition filter but not in 3.* version. 

 

> Add PushedFilters to metadata in Parquet DSv2 implementation
> 
>
> Key: SPARK-30162
> URL: https://issues.apache.org/jira/browse/SPARK-30162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
> Environment: pyspark 3.0 preview
> Ubuntu/Centos
> pyarrow 0.14.1 
>Reporter: Nasir Ali
>Assignee: Hyukjin Kwon
>Priority: Minor
> Fix For: 3.0.0
>
> Attachments: Screenshot from 2020-01-01 21-01-18.png, Screenshot from 
> 2020-01-01 21-01-32.png
>
>
> Filters are not pushed down in Spark 3.0 preview. Also the output of 
> "explain" method is different. It is hard to debug in 3.0 whether filters 
> were pushed down or not. Below code could reproduce the bug:
>  
> {code:java}
> // code placeholder
> df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
> ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-16T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
> spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
> {code}
> {code:java}
> // Spark 2.4 output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#40 = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
> Filter (isnotnull(user#40) && (user#40 = usr2))
> +- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
> *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
> PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
> ReadSchema: struct{code}
> {code:java}
> // Spark 3.0.0-preview output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
> Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#2 = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
> Logical Plan ==
> Filter (isnotnull(user#2) AND (user#2 = usr2))
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical 
> Plan ==
> *(1) Project [id#0, ts#1, user#2]
> +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
>+- *(1) ColumnarToRow
>   +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
> InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
> struct
> {code}
> I have tested it on much larger dataset. Spark 3.0 tries to load whole data 
> and then apply filter. Whereas Spark 2.4 push down the filter. Above output 
> shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview.
>  
> Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
> it's hard to debug.  spark.sql.orc.cache.stripe.details.size=1 doesn't 
> work.
>  
> {code:java}
> // pyspark 3 shell output
> $ pyspark
> Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
> [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
> Type "help", "copyright", "credits" or "license" for more information.

[jira] [Updated] (SPARK-30162) Add PushedFilters to metadata in Parquet DSv2 implementation

2020-01-01 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-30162:
--
Attachment: Screenshot from 2020-01-01 21-01-32.png
Screenshot from 2020-01-01 21-01-18.png

> Add PushedFilters to metadata in Parquet DSv2 implementation
> 
>
> Key: SPARK-30162
> URL: https://issues.apache.org/jira/browse/SPARK-30162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
> Environment: pyspark 3.0 preview
> Ubuntu/Centos
> pyarrow 0.14.1 
>Reporter: Nasir Ali
>Assignee: Hyukjin Kwon
>Priority: Minor
> Fix For: 3.0.0
>
> Attachments: Screenshot from 2020-01-01 21-01-18.png, Screenshot from 
> 2020-01-01 21-01-32.png
>
>
> Filters are not pushed down in Spark 3.0 preview. Also the output of 
> "explain" method is different. It is hard to debug in 3.0 whether filters 
> were pushed down or not. Below code could reproduce the bug:
>  
> {code:java}
> // code placeholder
> df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
> ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-16T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
> spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
> {code}
> {code:java}
> // Spark 2.4 output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#40 = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
> Filter (isnotnull(user#40) && (user#40 = usr2))
> +- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
> *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
> PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
> ReadSchema: struct{code}
> {code:java}
> // Spark 3.0.0-preview output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
> Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#2 = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
> Logical Plan ==
> Filter (isnotnull(user#2) AND (user#2 = usr2))
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical 
> Plan ==
> *(1) Project [id#0, ts#1, user#2]
> +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
>+- *(1) ColumnarToRow
>   +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
> InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
> struct
> {code}
> I have tested it on much larger dataset. Spark 3.0 tries to load whole data 
> and then apply filter. Whereas Spark 2.4 push down the filter. Above output 
> shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview.
>  
> Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
> it's hard to debug.  spark.sql.orc.cache.stripe.details.size=1 doesn't 
> work.
>  
> {code:java}
> // pyspark 3 shell output
> $ pyspark
> Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
> [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
> Type "help", "copyright", "credits" or "license" for more information.
> Warning: Ignoring non-spark config property: 
> java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8
> 19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 19/12/09 07:05:36 WARN SparkConf: Note that spark.local.dir will be 
> overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS 

[jira] [Reopened] (SPARK-30162) Add PushedFilters to metadata in Parquet DSv2 implementation

2020-01-01 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali reopened SPARK-30162:
---

This issue has not been fixed. It shows pushed filters but it does not actually 
push filters. For example, look at 2.4.* output; there it clearly shows 'user' 
as partition filter but not in 3.* version. 

 

> Add PushedFilters to metadata in Parquet DSv2 implementation
> 
>
> Key: SPARK-30162
> URL: https://issues.apache.org/jira/browse/SPARK-30162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
> Environment: pyspark 3.0 preview
> Ubuntu/Centos
> pyarrow 0.14.1 
>Reporter: Nasir Ali
>Assignee: Hyukjin Kwon
>Priority: Minor
> Fix For: 3.0.0
>
>
> Filters are not pushed down in Spark 3.0 preview. Also the output of 
> "explain" method is different. It is hard to debug in 3.0 whether filters 
> were pushed down or not. Below code could reproduce the bug:
>  
> {code:java}
> // code placeholder
> df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
> ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-16T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
> spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
> {code}
> {code:java}
> // Spark 2.4 output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#40 = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
> Filter (isnotnull(user#40) && (user#40 = usr2))
> +- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
> *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
> PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
> ReadSchema: struct{code}
> {code:java}
> // Spark 3.0.0-preview output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
> Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#2 = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
> Logical Plan ==
> Filter (isnotnull(user#2) AND (user#2 = usr2))
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical 
> Plan ==
> *(1) Project [id#0, ts#1, user#2]
> +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
>+- *(1) ColumnarToRow
>   +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
> InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
> struct
> {code}
> I have tested it on much larger dataset. Spark 3.0 tries to load whole data 
> and then apply filter. Whereas Spark 2.4 push down the filter. Above output 
> shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview.
>  
> Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
> it's hard to debug.  spark.sql.orc.cache.stripe.details.size=1 doesn't 
> work.
>  
> {code:java}
> // pyspark 3 shell output
> $ pyspark
> Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
> [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
> Type "help", "copyright", "credits" or "license" for more information.
> Warning: Ignoring non-spark config property: 
> java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8
> 19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 19/12/09 07:05:36 WARN SparkConf: Note that spark.local.dir will be 
> overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
> 

[jira] [Comment Edited] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-12-13 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995815#comment-16995815
 ] 

Nasir Ali edited comment on SPARK-28502 at 12/13/19 6:45 PM:
-

{code:java}
import numpy as np
import pandas as pd
import json
from geopy.distance import great_circle
from pyspark.sql.functions import pandas_udf, PandasUDFType
from shapely.geometry.multipoint import MultiPoint
from sklearn.cluster import DBSCAN
from pyspark.sql.types import StructField, StructType, StringType, FloatType, 
MapType
from pyspark.sql.types import StructField, StructType, StringType, FloatType, 
TimestampType, IntegerType,DateType,TimestampTypeschema = StructType([
   StructField("timestamp", TimestampType()),
   StructField("window", StructType([
   StructField("start", TimestampType()),
   StructField("end", TimestampType())])),
   StructField("some_val", StringType())
   ])@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def get_win_col(key, user_data):
all_vals = []
for index, row in user_data.iterrows():
all_vals.append([row["timestamp"],key[2],"tesss"])

return pd.DataFrame(all_vals,columns=['timestamp','window','some_val'])
{code}
I am not even able to manually return window column. It throws error
{code:java}
Traceback (most recent call last):
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 139, in 
returnType
to_arrow_type(self._returnType_placeholder)
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/types.py", line 1641, 
in to_arrow_type
raise TypeError("Nested StructType not supported in conversion to Arrow")
TypeError: Nested StructType not supported in conversion to ArrowDuring 
handling of the above exception, another exception occurred:Traceback (most 
recent call last):
  File "", line 1, in 
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 79, in 
_create_udf
return udf_obj._wrapped()
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 234, in 
_wrapped
wrapper.returnType = self.returnType
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 143, in 
returnType
"%s is not supported" % str(self._returnType_placeholder))
NotImplementedError: Invalid returnType with grouped map Pandas UDFs: 
StructType(List(StructField(timestamp,TimestampType,true),StructField(window,StructType(List(StructField(start,TimestampType,true),StructField(end,TimestampType,true))),true),StructField(some_val,StringType,true)))
 is not supported
{code}
However, if I manually run *to_arrow_schema(schema)*. It works all fine and 
there is no exception. 
[https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L139]
{code:java}
from pyspark.sql.types import to_arrow_schema
to_arrow_schema(schema)
{code}


was (Author: nasirali):
{code:java}
import numpy as np
import pandas as pd
import json
from geopy.distance import great_circle
from pyspark.sql.functions import pandas_udf, PandasUDFType
from shapely.geometry.multipoint import MultiPoint
from sklearn.cluster import DBSCAN
from pyspark.sql.types import StructField, StructType, StringType, FloatType, 
MapType
from pyspark.sql.types import StructField, StructType, StringType, FloatType, 
TimestampType, IntegerType,DateType,TimestampTypeschema = StructType([
   StructField("timestamp", TimestampType()),
   StructField("window", StructType([
   StructField("start", TimestampType()),
   StructField("end", TimestampType())])),
   StructField("some_val", StringType())
   ])@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def get_win_col(key, user_data):
all_vals = []
for index, row in user_data.iterrows():
all_vals.append([row["timestamp"],key[2],"tesss"])

return pd.DataFrame(all_vals,columns=['timestamp','window','some_val'])
{code}
I am not even able to manually return window column. It throws error
{code:java}
Traceback (most recent call last):
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 139, in 
returnType
to_arrow_type(self._returnType_placeholder)
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/types.py", line 1641, 
in to_arrow_type
raise TypeError("Nested StructType not supported in conversion to Arrow")
TypeError: Nested StructType not supported in conversion to ArrowDuring 
handling of the above exception, another exception occurred:Traceback (most 
recent call last):
  File "", line 1, in 
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 79, in 
_create_udf
return udf_obj._wrapped()
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 234, in 
_wrapped
wrapper.returnType = self.returnType
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 143, in 
returnType
"%s is not supported" % str(self._returnType_placeholder))
NotImplementedError: Invalid returnType with grouped map 

[jira] [Commented] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-12-13 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995815#comment-16995815
 ] 

Nasir Ali commented on SPARK-28502:
---

{code:java}
import numpy as np
import pandas as pd
import json
from geopy.distance import great_circle
from pyspark.sql.functions import pandas_udf, PandasUDFType
from shapely.geometry.multipoint import MultiPoint
from sklearn.cluster import DBSCAN
from pyspark.sql.types import StructField, StructType, StringType, FloatType, 
MapType
from pyspark.sql.types import StructField, StructType, StringType, FloatType, 
TimestampType, IntegerType,DateType,TimestampTypeschema = StructType([
   StructField("timestamp", TimestampType()),
   StructField("window", StructType([
   StructField("start", TimestampType()),
   StructField("end", TimestampType())])),
   StructField("some_val", StringType())
   ])@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def get_win_col(key, user_data):
all_vals = []
for index, row in user_data.iterrows():
all_vals.append([row["timestamp"],key[2],"tesss"])

return pd.DataFrame(all_vals,columns=['timestamp','window','some_val'])
{code}
I am not even able to manually return window column. It throws error
{code:java}
Traceback (most recent call last):
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 139, in 
returnType
to_arrow_type(self._returnType_placeholder)
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/types.py", line 1641, 
in to_arrow_type
raise TypeError("Nested StructType not supported in conversion to Arrow")
TypeError: Nested StructType not supported in conversion to ArrowDuring 
handling of the above exception, another exception occurred:Traceback (most 
recent call last):
  File "", line 1, in 
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 79, in 
_create_udf
return udf_obj._wrapped()
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 234, in 
_wrapped
wrapper.returnType = self.returnType
  File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 143, in 
returnType
"%s is not supported" % str(self._returnType_placeholder))
NotImplementedError: Invalid returnType with grouped map Pandas UDFs: 
StructType(List(StructField(timestamp,TimestampType,true),StructField(window,StructType(List(StructField(start,TimestampType,true),StructField(end,TimestampType,true))),true),StructField(some_val,StringType,true)))
 is not supported
{code}
However, if I manually run *to_arrow_schema(schema)*. It works all fine and 
there is no exception. 
{code:java}
from pyspark.sql.types import to_arrow_schema
to_arrow_schema(schema)
{code}

> Error with struct conversion while using pandas_udf
> ---
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
>Reporter: Nasir Ali
>Priority: Minor
> Fix For: 3.0.0
>
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days 
> window) and perform some operations on dataframe using (pandas) UDFs. I don't 
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>  
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
>["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def some_udf(df):
> # some computation
> return df
> df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
> {code}
> This throws following exception:
> {code:java}
> TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
> {code}
>  
> However, if I use builtin agg method then it works all fine. For example,
> {code:java}
> df.groupby('id', F.window("ts", "15 

[jira] [Commented] (SPARK-22947) SPIP: as-of join in Spark SQL

2019-12-11 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-22947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16993981#comment-16993981
 ] 

Nasir Ali commented on SPARK-22947:
---

[~icexelloss] Is there any update on this issue? Or is there any alternate in 
Spark to perform such task now?

> SPIP: as-of join in Spark SQL
> -
>
> Key: SPARK-22947
> URL: https://issues.apache.org/jira/browse/SPARK-22947
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.1
>Reporter: Li Jin
>Priority: Major
> Attachments: SPIP_ as-of join in Spark SQL (1).pdf
>
>
> h2. Background and Motivation
> Time series analysis is one of the most common analysis on financial data. In 
> time series analysis, as-of join is a very common operation. Supporting as-of 
> join in Spark SQL will allow many use cases of using Spark SQL for time 
> series analysis.
> As-of join is “join on time” with inexact time matching criteria. Various 
> library has implemented asof join or similar functionality:
> Kdb: https://code.kx.com/wiki/Reference/aj
> Pandas: 
> http://pandas.pydata.org/pandas-docs/version/0.19.0/merging.html#merging-merge-asof
> R: This functionality is called “Last Observation Carried Forward”
> https://www.rdocumentation.org/packages/zoo/versions/1.8-0/topics/na.locf
> JuliaDB: http://juliadb.org/latest/api/joins.html#IndexedTables.asofjoin
> Flint: https://github.com/twosigma/flint#temporal-join-functions
> This proposal advocates introducing new API in Spark SQL to support as-of 
> join.
> h2. Target Personas
> Data scientists, data engineers
> h2. Goals
> * New API in Spark SQL that allows as-of join
> * As-of join of multiple table (>2) should be performant, because it’s very 
> common that users need to join multiple data sources together for further 
> analysis.
> * Define Distribution, Partitioning and shuffle strategy for ordered time 
> series data
> h2. Non-Goals
> These are out of scope for the existing SPIP, should be considered in future 
> SPIP as improvement to Spark’s time series analysis ability:
> * Utilize partition information from data source, i.e, begin/end of each 
> partition to reduce sorting/shuffling
> * Define API for user to implement asof join time spec in business calendar 
> (i.e. lookback one business day, this is very common in financial data 
> analysis because of market calendars)
> * Support broadcast join
> h2. Proposed API Changes
> h3. TimeContext
> TimeContext is an object that defines the time scope of the analysis, it has 
> begin time (inclusive) and end time (exclusive). User should be able to 
> change the time scope of the analysis (i.e, from one month to five year) by 
> just changing the TimeContext. 
> To Spark engine, TimeContext is a hint that:
> can be used to repartition data for join
> serve as a predicate that can be pushed down to storage layer
> Time context is similar to filtering time by begin/end, the main difference 
> is that time context can be expanded based on the operation taken (see 
> example in as-of join).
> Time context example:
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> {code}
> h3. asofJoin
> h4. User Case A (join without key)
> Join two DataFrames on time, with one day lookback:
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> dfA = ...
> dfB = ...
> JoinSpec joinSpec = JoinSpec(timeContext).on("time").tolerance("-1day")
> result = dfA.asofJoin(dfB, joinSpec)
> {code}
> Example input/output:
> {code:java}
> dfA:
> time, quantity
> 20160101, 100
> 20160102, 50
> 20160104, -50
> 20160105, 100
> dfB:
> time, price
> 20151231, 100.0
> 20160104, 105.0
> 20160105, 102.0
> output:
> time, quantity, price
> 20160101, 100, 100.0
> 20160102, 50, null
> 20160104, -50, 105.0
> 20160105, 100, 102.0
> {code}
> Note row (20160101, 100) of dfA is joined with (20151231, 100.0) of dfB. This 
> is an important illustration of the time context - it is able to expand the 
> context to 20151231 on dfB because of the 1 day lookback.
> h4. Use Case B (join with key)
> To join on time and another key (for instance, id), we use “by” to specify 
> the key.
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> dfA = ...
> dfB = ...
> JoinSpec joinSpec = 
> JoinSpec(timeContext).on("time").by("id").tolerance("-1day")
> result = dfA.asofJoin(dfB, joinSpec)
> {code}
> Example input/output:
> {code:java}
> dfA:
> time, id, quantity
> 20160101, 1, 100
> 20160101, 2, 50
> 20160102, 1, -50
> 20160102, 2, 50
> dfB:
> time, id, price
> 20151231, 1, 100.0
> 20150102, 1, 105.0
> 20150102, 2, 195.0
> Output:
> time, id, quantity, price
> 20160101, 1, 100, 100.0
> 20160101, 2, 50, null
> 20160102, 1, -50, 105.0
> 20160102, 2, 50, 195.0
> {code}
> h2. Optional Design Sketch
> h3. Implementation 

[jira] [Commented] (SPARK-30162) Filter is not being pushed down for Parquet files

2019-12-09 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991573#comment-16991573
 ] 

Nasir Ali commented on SPARK-30162:
---

[~aman_omer] added in my question

> Filter is not being pushed down for Parquet files
> -
>
> Key: SPARK-30162
> URL: https://issues.apache.org/jira/browse/SPARK-30162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
> Environment: pyspark 3.0 preview
> Ubuntu/Centos
> pyarrow 0.14.1 
>Reporter: Nasir Ali
>Priority: Major
>
> Filters are not pushed down in Spark 3.0 preview. Also the output of 
> "explain" method is different. It is hard to debug in 3.0 whether filters 
> were pushed down or not. Below code could reproduce the bug:
>  
> {code:java}
> // code placeholder
> df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
> ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-16T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
> spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
> {code}
> {code:java}
> // Spark 2.4 output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#40 = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
> Filter (isnotnull(user#40) && (user#40 = usr2))
> +- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
> *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
> PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
> ReadSchema: struct{code}
> {code:java}
> // Spark 3.0.0-preview output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
> Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#2 = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
> Logical Plan ==
> Filter (isnotnull(user#2) AND (user#2 = usr2))
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical 
> Plan ==
> *(1) Project [id#0, ts#1, user#2]
> +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
>+- *(1) ColumnarToRow
>   +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
> InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
> struct
> {code}
> I have tested it on much larger dataset. Spark 3.0 tries to load whole data 
> and then apply filter. Whereas Spark 2.4 push down the filter. Above output 
> shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview.
>  
> Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
> it's hard to debug.  spark.sql.orc.cache.stripe.details.size=1 doesn't 
> work.
>  
> {code:java}
> // pyspark 3 shell output
> $ pyspark
> Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
> [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
> Type "help", "copyright", "credits" or "license" for more information.
> Warning: Ignoring non-spark config property: 
> java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8
> 19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 19/12/09 07:05:36 WARN SparkConf: Note that spark.local.dir will be 
> overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
> mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/__ / .__/\_,_/_/ /_/\_\   version 3.0.0-preview
>   /_/Using 

[jira] [Updated] (SPARK-30162) Filter is not being pushed down for Parquet files

2019-12-09 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-30162:
--
Description: 
Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" 
method is different. It is hard to debug in 3.0 whether filters were pushed 
down or not. Below code could reproduce the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
ReadSchema: struct{code}
{code:java}
// Spark 3.0.0-preview output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#2 = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
Logical Plan ==
Filter (isnotnull(user#2) AND (user#2 = usr2))
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan 
==
*(1) Project [id#0, ts#1, user#2]
+- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
   +- *(1) ColumnarToRow
  +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
struct
{code}
I have tested it on much larger dataset. Spark 3.0 tries to load whole data and 
then apply filter. Whereas Spark 2.4 push down the filter. Above output shows 
that Spark 2.4 applied partition filter but not the Spark 3.0 preview.

 

Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
it's hard to debug.  spark.sql.orc.cache.stripe.details.size=1 doesn't work.

 
{code:java}
// pyspark 3 shell output
$ pyspark
Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Warning: Ignoring non-spark config property: 
java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8
19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
19/12/09 07:05:36 WARN SparkConf: Note that spark.local.dir will be overridden 
by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.0-preview
  /_/Using Python version 3.6.8 (default, Aug  7 2019 17:28:10)
SparkSession available as 'spark'.
{code}
{code:java}
// pyspark 2.4.4 shell output
pyspark
Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
2019-12-09 07:09:07 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / 

[jira] [Commented] (SPARK-30162) Filter is not being pushed down for Parquet files

2019-12-07 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990741#comment-16990741
 ] 

Nasir Ali commented on SPARK-30162:
---

No, I only use pyspark shell.

> Filter is not being pushed down for Parquet files
> -
>
> Key: SPARK-30162
> URL: https://issues.apache.org/jira/browse/SPARK-30162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
> Environment: pyspark 3.0 preview
> Ubuntu/Centos
> pyarrow 0.14.1 
>Reporter: Nasir Ali
>Priority: Major
>
> Filters are not pushed down in Spark 3.0 preview. Also the output of 
> "explain" method is different. It is hard to debug in 3.0 whether filters 
> were pushed down or not. Below code could reproduce the bug:
>  
> {code:java}
> // code placeholder
> df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
> ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-16T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
> spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
> {code}
> {code:java}
> // Spark 2.4 output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#40 = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
> Filter (isnotnull(user#40) && (user#40 = usr2))
> +- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
> *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
> PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
> ReadSchema: struct{code}
> {code:java}
> // Spark 3.0.0-preview output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
> Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#2 = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
> Logical Plan ==
> Filter (isnotnull(user#2) AND (user#2 = usr2))
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical 
> Plan ==
> *(1) Project [id#0, ts#1, user#2]
> +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
>+- *(1) ColumnarToRow
>   +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
> InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
> struct
> {code}
> I have tested it on much larger dataset. Spark 3.0 tries to load whole data 
> and then apply filter. Whereas Spark 2.4 push down the filter. Above output 
> shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview.
>  
> Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
> it's hard to debug.  spark.sql.orc.cache.stripe.details.size=1 doesn't 
> work.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30162) Filter is not being pushed down for Parquet files

2019-12-07 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-30162:
--
Description: 
Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" 
method is different. It is hard to debug in 3.0 whether filters were pushed 
down or not. Below code could reproduce the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
ReadSchema: struct{code}
{code:java}
// Spark 3.0.0-preview output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#2 = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
Logical Plan ==
Filter (isnotnull(user#2) AND (user#2 = usr2))
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan 
==
*(1) Project [id#0, ts#1, user#2]
+- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
   +- *(1) ColumnarToRow
  +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
struct
{code}
I have tested it on much larger dataset. Spark 3.0 tries to load whole data and 
then apply filter. Whereas Spark 2.4 push down the filter. Above output shows 
that Spark 2.4 applied partition filter but not the Spark 3.0 preview.

 

Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
it's hard to debug.  spark.sql.orc.cache.stripe.details.size=1 doesn't work.

 

  was:
Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" 
method is different. It is hard to debug in 3.0 whether filters were pushed 
down or not. Below code could reproduce the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/nasir/data/")df2 = 
spark.read.load("/home/nasir/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/home/cnali/data], 

[jira] [Updated] (SPARK-30162) Filter is not being pushed down for Parquet files

2019-12-06 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-30162:
--
Description: 
Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" 
method is different. It is hard to debug in 3.0 whether filters were pushed 
down or not. Below code could reproduce the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/nasir/data/")df2 = 
spark.read.load("/home/nasir/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
ReadSchema: struct{code}
{code:java}
// Spark 3.0.0-preview output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#2 = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
Logical Plan ==
Filter (isnotnull(user#2) AND (user#2 = usr2))
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan 
==
*(1) Project [id#0, ts#1, user#2]
+- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
   +- *(1) ColumnarToRow
  +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
struct
{code}
I have tested it on much larger dataset. Spark 3.0 tries to load whole data and 
then apply filter. Whereas Spark 2.4 push down the filter. Above output shows 
that Spark 2.4 applied partition filter but not the Spark 3.0 preview.

 

Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
it's hard to debug.  spark.sql.orc.cache.stripe.details.size=1 doesn't work.

 

  was:
Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" 
method is different. It is hard to debug in 3.0 whether filters were pushed 
down or not. Below code could reproduce the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/nasir/data/")df2 = 
spark.read.load("/home/nasir/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/home/cnali/data], 

[jira] [Updated] (SPARK-30162) Filter is not being pushed down for Parquet files

2019-12-06 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-30162:
--
Description: 
Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" 
method is different. It is hard to debug in 3.0 whether filters were pushed 
down or not. Below code could reproduce the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/nasir/data/")df2 = 
spark.read.load("/home/nasir/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
ReadSchema: struct{code}
{code:java}
// Spark 3.0.0-preview output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#2 = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
Logical Plan ==
Filter (isnotnull(user#2) AND (user#2 = usr2))
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan 
==
*(1) Project [id#0, ts#1, user#2]
+- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
   +- *(1) ColumnarToRow
  +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
struct
{code}
I have tested it on much larger dataset. Spark 3.0 tries to load whole data and 
then apply filter. Whereas Spark 2.4 push down the filter. Above output shows 
that Spark 2.4 applied partition filter but not the Spark 3.0 preview.

 

Minor: in Spark 3.0 output is truncated (maybe fixed length?) and it's hard to 
debug.  spark.sql.orc.cache.stripe.details.size=1 doesn't work.

 

  was:
Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" 
method is different. It is hard to debug in 3.0 whether filters were pushed 
down or not. Below code could reproduce the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 

[jira] [Updated] (SPARK-30162) Filter is not being pushed down for Parquet files

2019-12-06 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-30162:
--
Summary: Filter is not being pushed down for Parquet files  (was: Filters 
is not being pushed down for Parquet files)

> Filter is not being pushed down for Parquet files
> -
>
> Key: SPARK-30162
> URL: https://issues.apache.org/jira/browse/SPARK-30162
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0
> Environment: pyspark 3.0 preview
> Ubuntu/Centos
> pyarrow 0.14.1 
>Reporter: Nasir Ali
>Priority: Major
>
> Filters are not pushed down in Spark 3.0 preview. Also the output of 
> "explain" method is different. It is hard to debug in 3.0 whether filters 
> were pushed down or not. Below code could reproduce the bug:
>  
> {code:java}
> // code placeholder
> df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
> ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
> ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
> ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
> ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
> ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
> ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
> ("usr2",25.00, "2018-03-16T11:27:18+00:00")
> ],
>["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
> spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
> {code}
> {code:java}
> // Spark 2.4 output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#40 = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
> Filter (isnotnull(user#40) && (user#40 = usr2))
> +- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
> *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
> PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
> ReadSchema: struct{code}
> {code:java}
> // Spark 3.0.0-preview output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
> Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#2 = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
> Logical Plan ==
> Filter (isnotnull(user#2) AND (user#2 = usr2))
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical 
> Plan ==
> *(1) Project [id#0, ts#1, user#2]
> +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
>+- *(1) ColumnarToRow
>   +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
> InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
> struct
> {code}
> I have tested it on much larger dataset. Spark 3.0 tries to load whole data 
> and then apply filter. Whereas Spark 2.4 push down the filter. Above output 
> shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview.
>  
> Minor: in Spark 3.0 output is truncated (maybe fixed length?) and it's hard 
> to debug.  spark.sql.orc.cache.stripe.details.size=1 doesn't work.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30162) Filters is not being pushed down for Parquet files

2019-12-06 Thread Nasir Ali (Jira)
Nasir Ali created SPARK-30162:
-

 Summary: Filters is not being pushed down for Parquet files
 Key: SPARK-30162
 URL: https://issues.apache.org/jira/browse/SPARK-30162
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
 Environment: pyspark 3.0 preview

Ubuntu/Centos

pyarrow 0.14.1 
Reporter: Nasir Ali


Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" 
method is different. It is hard to debug in 3.0 whether filters were pushed 
down or not. Below code could reproduce the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
   ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
ReadSchema: struct{code}
{code:java}
// Spark 3.0.0-preview output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#2 = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
Logical Plan ==
Filter (isnotnull(user#2) AND (user#2 = usr2))
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan 
==
*(1) Project [id#0, ts#1, user#2]
+- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
   +- *(1) ColumnarToRow
  +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
struct
{code}
I have tested it on much larger dataset. Spark 3.0 tries to load whole data and 
then apply filter. Whereas Spark 2.4 push down the filter. Above output shows 
that Spark 2.4 applied partition filter but not the Spark 3.0 preview.

 

Minor: in Spark 3.0 output is truncated (maybe fixed length?) and it's hard to 
debug.  spark.sql.orc.cache.stripe.details.size=1 doesn't work.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-11-06 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16968703#comment-16968703
 ] 

Nasir Ali edited comment on SPARK-28502 at 11/6/19 9:17 PM:


[~bryanc] If I perform any agg (e.g.,  _df.groupBy("id",F.window("ts", "15 
days")).agg(\{"id":"avg"}).show()_ ) on grouped data, pyspark returns me the 
key (e.g., id, window) with the avg for each group. However, in the above 
example, when udf returns the struct, it does not automatically return the key. 
I have to manually add window to returning dataframe. Is there a way to 
automatically concatenate results of udf?


was (Author: nasirali):
[~bryanc] If I perform any agg (e.g., avg) on grouped data, pyspark returns me 
the key (e.g., window etc.) with the avg for each row. However, in the above 
example, when udf returns the struct, it does not automatically return the key. 
I have to manually add window to returning dataframe. Is there a way to 
automatically concatenate results of udf with key?

> Error with struct conversion while using pandas_udf
> ---
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
>Reporter: Nasir Ali
>Priority: Minor
> Fix For: 3.0.0
>
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days 
> window) and perform some operations on dataframe using (pandas) UDFs. I don't 
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>  
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
>["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def some_udf(df):
> # some computation
> return df
> df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
> {code}
> This throws following exception:
> {code:java}
> TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
> {code}
>  
> However, if I use builtin agg method then it works all fine. For example,
> {code:java}
> df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
> {code}
> Output
> {code:java}
> +-+--+---+
> |id   |window|avg(id)|
> +-+--+---+
> |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
> |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
> |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
> |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
> |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
> |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
> |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
> +-+--+---+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-11-06 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16968703#comment-16968703
 ] 

Nasir Ali commented on SPARK-28502:
---

[~bryanc] If I perform any agg (e.g., avg) on grouped data, pyspark returns me 
the key (e.g., window etc.) with the avg for each row. However, in the above 
example, when udf returns the struct, it does not automatically return the key. 
I have to manually add window to returning dataframe. Is there a way to 
automatically concatenate results of udf with key?

> Error with struct conversion while using pandas_udf
> ---
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
>Reporter: Nasir Ali
>Priority: Minor
> Fix For: 3.0.0
>
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days 
> window) and perform some operations on dataframe using (pandas) UDFs. I don't 
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>  
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
>["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def some_udf(df):
> # some computation
> return df
> df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
> {code}
> This throws following exception:
> {code:java}
> TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
> {code}
>  
> However, if I use builtin agg method then it works all fine. For example,
> {code:java}
> df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
> {code}
> Output
> {code:java}
> +-+--+---+
> |id   |window|avg(id)|
> +-+--+---+
> |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
> |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
> |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
> |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
> |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
> |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
> |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
> +-+--+---+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-11-05 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali reopened SPARK-28502:
---

> Error with struct conversion while using pandas_udf
> ---
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
>Reporter: Nasir Ali
>Priority: Minor
> Fix For: 3.0.0
>
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days 
> window) and perform some operations on dataframe using (pandas) UDFs. I don't 
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>  
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
>["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def some_udf(df):
> # some computation
> return df
> df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
> {code}
> This throws following exception:
> {code:java}
> TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
> {code}
>  
> However, if I use builtin agg method then it works all fine. For example,
> {code:java}
> df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
> {code}
> Output
> {code:java}
> +-+--+---+
> |id   |window|avg(id)|
> +-+--+---+
> |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
> |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
> |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
> |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
> |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
> |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
> |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
> +-+--+---+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-11-05 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967977#comment-16967977
 ] 

Nasir Ali edited comment on SPARK-28502 at 11/6/19 12:38 AM:
-

[~bryanc] Sorry I had to remove my previous comment as I was in the middle of 
debugging this issue. I found the culprit package. My code works all fine with 
pyarrow==0.14.1. However, with the latest pyarrow release (0.15.1), my example 
code throws following exception. Please let me know if you need more 
information.

 
{code:java}
// code placeholder
19/11/05 18:23:17 ERROR Executor: Exception in task 13.0 in stage 5.0 (TID 13)
java.lang.IllegalArgumentException
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
 at 
org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:547)
 at 
org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
 at 
org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
 at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:178)
 at 
org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:169)
 at 
org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62)
 at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:89)
 at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
 at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:437)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
 at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:337)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
 at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:127)
 at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
19/11/05 18:23:17 ERROR TaskSetManager: Task 13 in stage 5.0 failed 1 times; 
aborting job
{code}


was (Author: nasirali):
Sorry I had to remove my previous comment as I was in the middle of debugging 
this issue. I found the culprit package. My code works all fine with 
pyarrow==0.14.1. However, with the latest pyarrow release (0.15.1), my example 
code throws following exception. Please let me know if you need more 
information.

 
{code:java}
// code placeholder
19/11/05 18:23:17 ERROR Executor: Exception in task 13.0 in stage 5.0 (TID 13)
java.lang.IllegalArgumentException
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
 at 
org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:547)
 at 
org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
 at 
org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
 at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:178)
 at 
org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:169)
 at 
org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62)
 at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:89)
 at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
 at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:437)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
 at 

[jira] [Commented] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-11-05 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967977#comment-16967977
 ] 

Nasir Ali commented on SPARK-28502:
---

Sorry I had to remove my previous comment as I was in the middle of debugging 
this issue. I found the culprit package. My code works all fine with 
pyarrow==0.14.1. However, with the latest pyarrow release (0.15.1), my example 
code throws following exception. Please let me know if you need more 
information.

 
{code:java}
// code placeholder
19/11/05 18:23:17 ERROR Executor: Exception in task 13.0 in stage 5.0 (TID 13)
java.lang.IllegalArgumentException
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
 at 
org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:547)
 at 
org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
 at 
org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
 at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:178)
 at 
org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:169)
 at 
org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62)
 at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:89)
 at 
org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
 at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:437)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
 at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
 at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
 at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
 at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:337)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
 at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:127)
 at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
19/11/05 18:23:17 ERROR TaskSetManager: Task 13 in stage 5.0 failed 1 times; 
aborting job
{code}

> Error with struct conversion while using pandas_udf
> ---
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
>Reporter: Nasir Ali
>Priority: Minor
> Fix For: 3.0.0
>
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days 
> window) and perform some operations on dataframe using (pandas) UDFs. I don't 
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>  
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
>["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, 

[jira] [Resolved] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-11-05 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali resolved SPARK-28502.
---
Resolution: Fixed

> Error with struct conversion while using pandas_udf
> ---
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
>Reporter: Nasir Ali
>Priority: Minor
> Fix For: 3.0.0
>
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days 
> window) and perform some operations on dataframe using (pandas) UDFs. I don't 
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>  
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
>["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def some_udf(df):
> # some computation
> return df
> df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
> {code}
> This throws following exception:
> {code:java}
> TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
> {code}
>  
> However, if I use builtin agg method then it works all fine. For example,
> {code:java}
> df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
> {code}
> Output
> {code:java}
> +-+--+---+
> |id   |window|avg(id)|
> +-+--+---+
> |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
> |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
> |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
> |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
> |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
> |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
> |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
> +-+--+---+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-11-05 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-28502:
--
Comment: was deleted

(was: [~bryanc] I have tested the same code on newly released spark version 
([v3.0.0-preview-rc2|https://github.com/apache/spark/releases/tag/v3.0.0-preview-rc2])
 on git. However, it failed with the same exception mentioned in earlier bug 
report)

> Error with struct conversion while using pandas_udf
> ---
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
>Reporter: Nasir Ali
>Priority: Minor
> Fix For: 3.0.0
>
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days 
> window) and perform some operations on dataframe using (pandas) UDFs. I don't 
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>  
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
>["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def some_udf(df):
> # some computation
> return df
> df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
> {code}
> This throws following exception:
> {code:java}
> TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
> {code}
>  
> However, if I use builtin agg method then it works all fine. For example,
> {code:java}
> df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
> {code}
> Output
> {code:java}
> +-+--+---+
> |id   |window|avg(id)|
> +-+--+---+
> |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
> |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
> |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
> |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
> |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
> |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
> |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
> +-+--+---+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-11-05 Thread Nasir Ali (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali reopened SPARK-28502:
---

[~bryanc] I have tested the same code on newly released spark version 
([v3.0.0-preview-rc2|https://github.com/apache/spark/releases/tag/v3.0.0-preview-rc2])
 on git. However, it failed with the same exception mentioned in earlier bug 
report

> Error with struct conversion while using pandas_udf
> ---
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
>Reporter: Nasir Ali
>Priority: Minor
> Fix For: 3.0.0
>
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days 
> window) and perform some operations on dataframe using (pandas) UDFs. I don't 
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>  
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
>["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def some_udf(df):
> # some computation
> return df
> df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
> {code}
> This throws following exception:
> {code:java}
> TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
> {code}
>  
> However, if I use builtin agg method then it works all fine. For example,
> {code:java}
> df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
> {code}
> Output
> {code:java}
> +-+--+---+
> |id   |window|avg(id)|
> +-+--+---+
> |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
> |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
> |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
> |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
> |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
> |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
> |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
> +-+--+---+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-10-10 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16948832#comment-16948832
 ] 

Nasir Ali commented on SPARK-28502:
---

[~bryanc] I tested it and it works fine with master branch. Is there any 
expected release date for version 3? Or could this bug be fix be integrated in 
next release?

> Error with struct conversion while using pandas_udf
> ---
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
>Reporter: Nasir Ali
>Priority: Minor
> Fix For: 3.0.0
>
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days 
> window) and perform some operations on dataframe using (pandas) UDFs. I don't 
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>  
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
>["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def some_udf(df):
> # some computation
> return df
> df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
> {code}
> This throws following exception:
> {code:java}
> TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
> {code}
>  
> However, if I use builtin agg method then it works all fine. For example,
> {code:java}
> df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
> {code}
> Output
> {code:java}
> +-+--+---+
> |id   |window|avg(id)|
> +-+--+---+
> |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
> |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
> |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
> |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
> |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
> |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
> |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
> +-+--+---+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-09-21 Thread Nasir Ali (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935065#comment-16935065
 ] 

Nasir Ali commented on SPARK-28502:
---

any update?

> Error with struct conversion while using pandas_udf
> ---
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
>Reporter: Nasir Ali
>Priority: Minor
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days 
> window) and perform some operations on dataframe using (pandas) UDFs. I don't 
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>  
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
>["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def some_udf(df):
> # some computation
> return df
> df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
> {code}
> This throws following exception:
> {code:java}
> TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
> {code}
>  
> However, if I use builtin agg method then it works all fine. For example,
> {code:java}
> df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
> {code}
> Output
> {code:java}
> +-+--+---+
> |id   |window|avg(id)|
> +-+--+---+
> |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
> |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
> |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
> |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
> |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
> |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
> |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
> +-+--+---+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-07-26 Thread Nasir Ali (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16894238#comment-16894238
 ] 

Nasir Ali commented on SPARK-28502:
---

I tried to set timzone to UTC as suggested by [~icexelloss] but it didn't solve 
the problem. It is throwing following error (same error but with tz=UTC).

 
{code:java}
TypeError: Unsupported type in conversion from Arrow: struct
{code}
I think, this is a conversion error, maybe? Below is the complete trace:

 
{code:java}
df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
File 
"/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
 line 378, in show
File 
"/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
 line 1257, in __call__
File 
"/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py",
 line 63, in deco
File 
"/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o207.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in 
stage 5.0 failed 1 times, most recent failure: Lost task 13.0 in stage 5.0 (TID 
32, localhost, executor driver): org.apache.spark.api.python.PythonException: 
Traceback (most recent call last):
File 
"/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py",
 line 372, in main
process()
File 
"/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py",
 line 367, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File 
"/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py",
 line 283, in dump_stream
for series in iterator:
File 
"/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py",
 line 301, in load_stream
yield [self.arrow_to_pandas(c) for c in 
pa.Table.from_batches([batch]).itercolumns()]
File 
"/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py",
 line 301, in 
yield [self.arrow_to_pandas(c) for c in 
pa.Table.from_batches([batch]).itercolumns()]
File 
"/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py",
 line 271, in arrow_to_pandas
s = _check_series_convert_date(s, from_arrow_type(arrow_column.type))
File 
"/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 1672, in from_arrow_type
raise TypeError("Unsupported type in conversion from Arrow: " + str(at))
TypeError: Unsupported type in conversion from Arrow: struct
{code}

> Error with struct conversion while using pandas_udf
> ---
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
>Reporter: Nasir Ali
>Priority: Minor
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days 
> window) and perform some operations on dataframe using (pandas) UDFs. I don't 
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>  
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
>["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def some_udf(df):
> # some computation
> return df
> df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
> {code}
> This throws following exception:
> {code:java}
> TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
> {code}
>  
> However, if I use builtin agg method then it works all fine. For example,
> {code:java}
> df.groupby('id', F.window("ts", "15 

[jira] [Updated] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-07-25 Thread Nasir Ali (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-28502:
--
Issue Type: Bug  (was: Question)

> Error with struct conversion while using pandas_udf
> ---
>
> Key: SPARK-28502
> URL: https://issues.apache.org/jira/browse/SPARK-28502
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
> Environment: OS: Ubuntu
> Python: 3.6
>Reporter: Nasir Ali
>Priority: Minor
>
> What I am trying to do: Group data based on time intervals (e.g., 15 days 
> window) and perform some operations on dataframe using (pandas) UDFs. I don't 
> know if there is a better/cleaner way to do it.
> Below is the sample code that I tried and error message I am getting.
>  
> {code:java}
> df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
> (13.00, "2018-03-11T12:27:18+00:00"),
> (25.00, "2018-03-12T11:27:18+00:00"),
> (20.00, "2018-03-13T15:27:18+00:00"),
> (17.00, "2018-03-14T12:27:18+00:00"),
> (99.00, "2018-03-15T11:27:18+00:00"),
> (156.00, "2018-03-22T11:27:18+00:00"),
> (17.00, "2018-03-31T11:27:18+00:00"),
> (25.00, "2018-03-15T11:27:18+00:00"),
> (25.00, "2018-03-16T11:27:18+00:00")
> ],
>["id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> schema = StructType([
> StructField("id", IntegerType()),
> StructField("ts", TimestampType())
> ])
> @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
> def some_udf(df):
> # some computation
> return df
> df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
> {code}
> This throws following exception:
> {code:java}
> TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]>
> {code}
>  
> However, if I use builtin agg method then it works all fine. For example,
> {code:java}
> df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
> {code}
> Output
> {code:java}
> +-+--+---+
> |id   |window|avg(id)|
> +-+--+---+
> |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
> |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
> |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
> |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
> |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
> |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
> |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
> +-+--+---+
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-07-24 Thread Nasir Ali (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-28502:
--
Description: 
What I am trying to do: Group data based on time intervals (e.g., 15 days 
window) and perform some operations on dataframe using (pandas) UDFs. I don't 
know if there is a better/cleaner way to do it.

Below is the sample code that I tried and error message I am getting.

 
{code:java}
df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
(13.00, "2018-03-11T12:27:18+00:00"),
(25.00, "2018-03-12T11:27:18+00:00"),
(20.00, "2018-03-13T15:27:18+00:00"),
(17.00, "2018-03-14T12:27:18+00:00"),
(99.00, "2018-03-15T11:27:18+00:00"),
(156.00, "2018-03-22T11:27:18+00:00"),
(17.00, "2018-03-31T11:27:18+00:00"),
(25.00, "2018-03-15T11:27:18+00:00"),
(25.00, "2018-03-16T11:27:18+00:00")
],
   ["id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))

schema = StructType([
StructField("id", IntegerType()),
StructField("ts", TimestampType())
])


@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def some_udf(df):
# some computation
return df

df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
{code}
This throws following exception:
{code:java}
TypeError: Unsupported type in conversion from Arrow: struct
{code}
 

However, if I use builtin agg method then it works all fine. For example,
{code:java}
df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
{code}
Output
{code:java}
+-+--+---+
|id   |window|avg(id)|
+-+--+---+
|13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
|17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
|156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
|99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
|20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
|17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
|25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
+-+--+---+
{code}

  was:
What I am trying to do: Group data based on time intervals (e.g., 15 days 
window) and perform some operations on dataframe using (pandas) UDFs. I don't 
know if there is a better/cleaner solution to do it.

Below is the sample code that I tried and error message I am getting.

 
{code:java}
df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
(13.00, "2018-03-11T12:27:18+00:00"),
(25.00, "2018-03-12T11:27:18+00:00"),
(20.00, "2018-03-13T15:27:18+00:00"),
(17.00, "2018-03-14T12:27:18+00:00"),
(99.00, "2018-03-15T11:27:18+00:00"),
(156.00, "2018-03-22T11:27:18+00:00"),
(17.00, "2018-03-31T11:27:18+00:00"),
(25.00, "2018-03-15T11:27:18+00:00"),
(25.00, "2018-03-16T11:27:18+00:00")
],
   ["id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))

schema = StructType([
StructField("id", IntegerType()),
StructField("ts", TimestampType())
])


@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def some_udf(df):
# some computation
return df

df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
{code}
This throws following exception:
{code:java}
TypeError: Unsupported type in conversion from Arrow: struct
{code}
 

However, if I use builtin agg method then it works all fine. For example,
{code:java}
df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
{code}
Output
{code:java}
+-+--+---+
|id   |window|avg(id)|
+-+--+---+
|13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
|17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
|156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
|99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
|20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
|17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
|25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
+-+--+---+
{code}


> Error with struct conversion while using pandas_udf
> ---
>
> Key: 

[jira] [Updated] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-07-24 Thread Nasir Ali (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-28502:
--
Description: 
What I am trying to do: Group data based on time intervals (e.g., 15 days 
window) and perform some operations on dataframe using (pandas) UDFs. I don't 
know if there is a better/cleaner solution to do it.

Below is the sample code that I tried and error message I am getting.

 
{code:java}
df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
(13.00, "2018-03-11T12:27:18+00:00"),
(25.00, "2018-03-12T11:27:18+00:00"),
(20.00, "2018-03-13T15:27:18+00:00"),
(17.00, "2018-03-14T12:27:18+00:00"),
(99.00, "2018-03-15T11:27:18+00:00"),
(156.00, "2018-03-22T11:27:18+00:00"),
(17.00, "2018-03-31T11:27:18+00:00"),
(25.00, "2018-03-15T11:27:18+00:00"),
(25.00, "2018-03-16T11:27:18+00:00")
],
   ["id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))

schema = StructType([
StructField("id", IntegerType()),
StructField("ts", TimestampType())
])


@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def some_udf(df):
# some computation
return df

df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
{code}
This throws following exception:
{code:java}
TypeError: Unsupported type in conversion from Arrow: struct
{code}
 

However, if I use builtin agg method then it works all fine. For example,
{code:java}
df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
{code}
Output
{code:java}
+-+--+---+
|id   |window|avg(id)|
+-+--+---+
|13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
|17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
|156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
|99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
|20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
|17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
|25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
+-+--+---+
{code}

  was:
What I am trying to do: Group data based on time intervals (e.g., 15 days 
window) and perform some operations on dataframe using (pandas) UDFs. I am new 
to pyspark. I don't know if there is a better/cleaner solution to do it.

Below is the sample code that I tried and error message I am getting.

 
{code:java}
df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
(13.00, "2018-03-11T12:27:18+00:00"),
(25.00, "2018-03-12T11:27:18+00:00"),
(20.00, "2018-03-13T15:27:18+00:00"),
(17.00, "2018-03-14T12:27:18+00:00"),
(99.00, "2018-03-15T11:27:18+00:00"),
(156.00, "2018-03-22T11:27:18+00:00"),
(17.00, "2018-03-31T11:27:18+00:00"),
(25.00, "2018-03-15T11:27:18+00:00"),
(25.00, "2018-03-16T11:27:18+00:00")
],
   ["id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))

schema = StructType([
StructField("id", IntegerType()),
StructField("ts", TimestampType())
])


@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def some_udf(df):
# some computation
return df

df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
{code}
This throws following exception:
{code:java}
TypeError: Unsupported type in conversion from Arrow: struct
{code}
 

However, if I use builtin agg method then it works all fine. For example,
{code:java}
df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
{code}
Output
{code:java}
+-+--+---+
|id   |window|avg(id)|
+-+--+---+
|13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
|17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
|156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
|99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
|20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
|17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
|25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
+-+--+---+
{code}


> Error with struct conversion while using pandas_udf
> ---
>
>   

[jira] [Created] (SPARK-28502) Error with struct conversion while using pandas_udf

2019-07-24 Thread Nasir Ali (JIRA)
Nasir Ali created SPARK-28502:
-

 Summary: Error with struct conversion while using pandas_udf
 Key: SPARK-28502
 URL: https://issues.apache.org/jira/browse/SPARK-28502
 Project: Spark
  Issue Type: Question
  Components: PySpark
Affects Versions: 2.4.3
 Environment: OS: Ubuntu

Python: 3.6
Reporter: Nasir Ali


What I am trying to do: Group data based on time intervals (e.g., 15 days 
window) and perform some operations on dataframe using (pandas) UDFs. I am new 
to pyspark. I don't know if there is a better/cleaner solution to do it.

Below is the sample code that I tried and error message I am getting.

 
{code:java}
df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"),
(13.00, "2018-03-11T12:27:18+00:00"),
(25.00, "2018-03-12T11:27:18+00:00"),
(20.00, "2018-03-13T15:27:18+00:00"),
(17.00, "2018-03-14T12:27:18+00:00"),
(99.00, "2018-03-15T11:27:18+00:00"),
(156.00, "2018-03-22T11:27:18+00:00"),
(17.00, "2018-03-31T11:27:18+00:00"),
(25.00, "2018-03-15T11:27:18+00:00"),
(25.00, "2018-03-16T11:27:18+00:00")
],
   ["id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))

schema = StructType([
StructField("id", IntegerType()),
StructField("ts", TimestampType())
])


@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def some_udf(df):
# some computation
return df

df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show()
{code}
This throws following exception:
{code:java}
TypeError: Unsupported type in conversion from Arrow: struct
{code}
 

However, if I use builtin agg method then it works all fine. For example,
{code:java}
df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False)
{code}
Output
{code:java}
+-+--+---+
|id   |window|avg(id)|
+-+--+---+
|13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0   |
|17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0   |
|156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0  |
|99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0   |
|20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0   |
|17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0   |
|25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0   |
+-+--+---+
{code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org