[jira] [Commented] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17368550#comment-17368550 ] Nasir Ali commented on SPARK-33863: --- [~dc-heros] Could you please share the output you got when you ran the above code? > Pyspark UDF wrongly changes timestamps to UTC > - > > Key: SPARK-33863 > URL: https://issues.apache.org/jira/browse/SPARK-33863 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.1, 3.0.2, 3.1.0, 3.1.1, 3.1.2 > Environment: MAC/Linux > Standalone cluster / local machine >Reporter: Nasir Ali >Priority: Major > > *Problem*: > I have a dataframe with a ts (timestamp) column in UTC. If I create a new > column using udf, pyspark udf wrongly changes timestamps into UTC time. ts > (timestamp) column is already in UTC time. Therefore, pyspark udf should not > convert ts (timestamp) column into UTC timestamp. > I have used following configs to let spark know the timestamps are in UTC: > > {code:java} > --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC > --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC > --conf spark.sql.session.timeZone=UTC > {code} > Below is a code snippet to reproduce the error: > > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql import functions as F > from pyspark.sql.types import StringType, TimestampType > import datetime > spark = SparkSession.builder.config("spark.sql.session.timeZone", > "UTC").getOrCreate() > df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), > ("usr1",13.00, "2018-02-11T12:27:18+00:00"), > ("usr1",25.00, "2018-02-12T11:27:18+00:00"), > ("usr1",20.00, "2018-02-13T15:27:18+00:00"), > ("usr1",17.00, "2018-02-14T12:27:18+00:00"), > ("usr2",99.00, "2018-02-15T11:27:18+00:00"), > ("usr2",156.00, "2018-02-22T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.show(truncate=False) > def some_time_udf(i): > if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): > tmp= "Morning: " + str(i) > elif datetime.time(12, 0)<=i.time() < datetime.time(17, 0): > tmp= "Afternoon: " + str(i) > elif datetime.time(17, 0)<=i.time() < datetime.time(21, 0): > tmp= "Evening" > elif datetime.time(21, 0)<=i.time() < datetime.time(0, 0): > tmp= "Night" > elif datetime.time(0, 0)<=i.time() < datetime.time(5, 0): > tmp= "Night" > return tmp > udf = F.udf(some_time_udf,StringType()) > df.withColumn("day_part", udf(df.ts)).show(truncate=False) > {code} > > Below is the output of the above code: > {code:java} > ++-+---++ > |user|id |ts |day_part| > ++-+---++ > |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18| > |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18| > |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18| > |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18| > |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18| > |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18| > |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18| > ++-+---++ > {code} > Above output is incorrect. You can see ts and day_part columns don't have > same timestamps. Below is the output I would expect: > > {code:java} > ++-+---++ > |user|id |ts |day_part| > ++-+---++ > |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18| > |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18| > |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18| > |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18| > |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18| > |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18| > |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18| > ++-+---++{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali reopened SPARK-33863: --- Issue is not resolved. > Pyspark UDF wrongly changes timestamps to UTC > - > > Key: SPARK-33863 > URL: https://issues.apache.org/jira/browse/SPARK-33863 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.1, 3.0.2, 3.1.0, 3.1.1, 3.1.2 > Environment: MAC/Linux > Standalone cluster / local machine >Reporter: Nasir Ali >Priority: Major > > *Problem*: > I have a dataframe with a ts (timestamp) column in UTC. If I create a new > column using udf, pyspark udf wrongly changes timestamps into UTC time. ts > (timestamp) column is already in UTC time. Therefore, pyspark udf should not > convert ts (timestamp) column into UTC timestamp. > I have used following configs to let spark know the timestamps are in UTC: > > {code:java} > --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC > --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC > --conf spark.sql.session.timeZone=UTC > {code} > Below is a code snippet to reproduce the error: > > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql import functions as F > from pyspark.sql.types import StringType, TimestampType > import datetime > spark = SparkSession.builder.config("spark.sql.session.timeZone", > "UTC").getOrCreate() > df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), > ("usr1",13.00, "2018-02-11T12:27:18+00:00"), > ("usr1",25.00, "2018-02-12T11:27:18+00:00"), > ("usr1",20.00, "2018-02-13T15:27:18+00:00"), > ("usr1",17.00, "2018-02-14T12:27:18+00:00"), > ("usr2",99.00, "2018-02-15T11:27:18+00:00"), > ("usr2",156.00, "2018-02-22T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.show(truncate=False) > def some_time_udf(i): > if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): > tmp= "Morning: " + str(i) > elif datetime.time(12, 0)<=i.time() < datetime.time(17, 0): > tmp= "Afternoon: " + str(i) > elif datetime.time(17, 0)<=i.time() < datetime.time(21, 0): > tmp= "Evening" > elif datetime.time(21, 0)<=i.time() < datetime.time(0, 0): > tmp= "Night" > elif datetime.time(0, 0)<=i.time() < datetime.time(5, 0): > tmp= "Night" > return tmp > udf = F.udf(some_time_udf,StringType()) > df.withColumn("day_part", udf(df.ts)).show(truncate=False) > {code} > > Below is the output of the above code: > {code:java} > ++-+---++ > |user|id |ts |day_part| > ++-+---++ > |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18| > |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18| > |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18| > |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18| > |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18| > |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18| > |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18| > ++-+---++ > {code} > Above output is incorrect. You can see ts and day_part columns don't have > same timestamps. Below is the output I would expect: > > {code:java} > ++-+---++ > |user|id |ts |day_part| > ++-+---++ > |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18| > |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18| > |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18| > |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18| > |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18| > |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18| > |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18| > ++-+---++{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-33863: -- Affects Version/s: 3.1.2 > Pyspark UDF wrongly changes timestamps to UTC > - > > Key: SPARK-33863 > URL: https://issues.apache.org/jira/browse/SPARK-33863 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.1, 3.0.2, 3.1.0, 3.1.1, 3.1.2 > Environment: MAC/Linux > Standalone cluster / local machine >Reporter: Nasir Ali >Priority: Major > > *Problem*: > I have a dataframe with a ts (timestamp) column in UTC. If I create a new > column using udf, pyspark udf wrongly changes timestamps into UTC time. ts > (timestamp) column is already in UTC time. Therefore, pyspark udf should not > convert ts (timestamp) column into UTC timestamp. > I have used following configs to let spark know the timestamps are in UTC: > > {code:java} > --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC > --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC > --conf spark.sql.session.timeZone=UTC > {code} > Below is a code snippet to reproduce the error: > > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql import functions as F > from pyspark.sql.types import StringType, TimestampType > import datetime > spark = SparkSession.builder.config("spark.sql.session.timeZone", > "UTC").getOrCreate() > df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), > ("usr1",13.00, "2018-02-11T12:27:18+00:00"), > ("usr1",25.00, "2018-02-12T11:27:18+00:00"), > ("usr1",20.00, "2018-02-13T15:27:18+00:00"), > ("usr1",17.00, "2018-02-14T12:27:18+00:00"), > ("usr2",99.00, "2018-02-15T11:27:18+00:00"), > ("usr2",156.00, "2018-02-22T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.show(truncate=False) > def some_time_udf(i): > if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): > tmp= "Morning: " + str(i) > elif datetime.time(12, 0)<=i.time() < datetime.time(17, 0): > tmp= "Afternoon: " + str(i) > elif datetime.time(17, 0)<=i.time() < datetime.time(21, 0): > tmp= "Evening" > elif datetime.time(21, 0)<=i.time() < datetime.time(0, 0): > tmp= "Night" > elif datetime.time(0, 0)<=i.time() < datetime.time(5, 0): > tmp= "Night" > return tmp > udf = F.udf(some_time_udf,StringType()) > df.withColumn("day_part", udf(df.ts)).show(truncate=False) > {code} > > Below is the output of the above code: > {code:java} > ++-+---++ > |user|id |ts |day_part| > ++-+---++ > |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18| > |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18| > |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18| > |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18| > |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18| > |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18| > |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18| > ++-+---++ > {code} > Above output is incorrect. You can see ts and day_part columns don't have > same timestamps. Below is the output I would expect: > > {code:java} > ++-+---++ > |user|id |ts |day_part| > ++-+---++ > |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18| > |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18| > |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18| > |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18| > |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18| > |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18| > |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18| > ++-+---++{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-33863: -- Affects Version/s: 3.0.2 3.1.0 3.1.1 > Pyspark UDF wrongly changes timestamps to UTC > - > > Key: SPARK-33863 > URL: https://issues.apache.org/jira/browse/SPARK-33863 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.1, 3.0.2, 3.1.0, 3.1.1 > Environment: MAC/Linux > Standalone cluster / local machine >Reporter: Nasir Ali >Priority: Major > > *Problem*: > I have a dataframe with a ts (timestamp) column in UTC. If I create a new > column using udf, pyspark udf wrongly changes timestamps into UTC time. ts > (timestamp) column is already in UTC time. Therefore, pyspark udf should not > convert ts (timestamp) column into UTC timestamp. > I have used following configs to let spark know the timestamps are in UTC: > > {code:java} > --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC > --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC > --conf spark.sql.session.timeZone=UTC > {code} > Below is a code snippet to reproduce the error: > > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql import functions as F > from pyspark.sql.types import StringType, TimestampType > import datetime > spark = SparkSession.builder.config("spark.sql.session.timeZone", > "UTC").getOrCreate() > df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), > ("usr1",13.00, "2018-02-11T12:27:18+00:00"), > ("usr1",25.00, "2018-02-12T11:27:18+00:00"), > ("usr1",20.00, "2018-02-13T15:27:18+00:00"), > ("usr1",17.00, "2018-02-14T12:27:18+00:00"), > ("usr2",99.00, "2018-02-15T11:27:18+00:00"), > ("usr2",156.00, "2018-02-22T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.show(truncate=False) > def some_time_udf(i): > if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): > tmp= "Morning: " + str(i) > elif datetime.time(12, 0)<=i.time() < datetime.time(17, 0): > tmp= "Afternoon: " + str(i) > elif datetime.time(17, 0)<=i.time() < datetime.time(21, 0): > tmp= "Evening" > elif datetime.time(21, 0)<=i.time() < datetime.time(0, 0): > tmp= "Night" > elif datetime.time(0, 0)<=i.time() < datetime.time(5, 0): > tmp= "Night" > return tmp > udf = F.udf(some_time_udf,StringType()) > df.withColumn("day_part", udf(df.ts)).show(truncate=False) > {code} > > Below is the output of the above code: > {code:java} > ++-+---++ > |user|id |ts |day_part| > ++-+---++ > |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18| > |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18| > |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18| > |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18| > |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18| > |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18| > |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18| > ++-+---++ > {code} > Above output is incorrect. You can see ts and day_part columns don't have > same timestamps. Below is the output I would expect: > > {code:java} > ++-+---++ > |user|id |ts |day_part| > ++-+---++ > |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18| > |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18| > |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18| > |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18| > |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18| > |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18| > |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18| > ++-+---++{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17341075#comment-17341075 ] Nasir Ali commented on SPARK-33863: --- [~hyukjin.kwon] and [~viirya] This bug exist in all the 3.x.x versions of Pyspark. Any update or suggestion? > Pyspark UDF wrongly changes timestamps to UTC > - > > Key: SPARK-33863 > URL: https://issues.apache.org/jira/browse/SPARK-33863 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.1, 3.0.2, 3.1.0, 3.1.1 > Environment: MAC/Linux > Standalone cluster / local machine >Reporter: Nasir Ali >Priority: Major > > *Problem*: > I have a dataframe with a ts (timestamp) column in UTC. If I create a new > column using udf, pyspark udf wrongly changes timestamps into UTC time. ts > (timestamp) column is already in UTC time. Therefore, pyspark udf should not > convert ts (timestamp) column into UTC timestamp. > I have used following configs to let spark know the timestamps are in UTC: > > {code:java} > --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC > --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC > --conf spark.sql.session.timeZone=UTC > {code} > Below is a code snippet to reproduce the error: > > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql import functions as F > from pyspark.sql.types import StringType, TimestampType > import datetime > spark = SparkSession.builder.config("spark.sql.session.timeZone", > "UTC").getOrCreate() > df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), > ("usr1",13.00, "2018-02-11T12:27:18+00:00"), > ("usr1",25.00, "2018-02-12T11:27:18+00:00"), > ("usr1",20.00, "2018-02-13T15:27:18+00:00"), > ("usr1",17.00, "2018-02-14T12:27:18+00:00"), > ("usr2",99.00, "2018-02-15T11:27:18+00:00"), > ("usr2",156.00, "2018-02-22T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.show(truncate=False) > def some_time_udf(i): > if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): > tmp= "Morning: " + str(i) > elif datetime.time(12, 0)<=i.time() < datetime.time(17, 0): > tmp= "Afternoon: " + str(i) > elif datetime.time(17, 0)<=i.time() < datetime.time(21, 0): > tmp= "Evening" > elif datetime.time(21, 0)<=i.time() < datetime.time(0, 0): > tmp= "Night" > elif datetime.time(0, 0)<=i.time() < datetime.time(5, 0): > tmp= "Night" > return tmp > udf = F.udf(some_time_udf,StringType()) > df.withColumn("day_part", udf(df.ts)).show(truncate=False) > {code} > > Below is the output of the above code: > {code:java} > ++-+---++ > |user|id |ts |day_part| > ++-+---++ > |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18| > |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18| > |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18| > |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18| > |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18| > |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18| > |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18| > ++-+---++ > {code} > Above output is incorrect. You can see ts and day_part columns don't have > same timestamps. Below is the output I would expect: > > {code:java} > ++-+---++ > |user|id |ts |day_part| > ++-+---++ > |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18| > |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18| > |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18| > |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18| > |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18| > |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18| > |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18| > ++-+---++{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17288802#comment-17288802 ] Nasir Ali commented on SPARK-33863: --- [~hyukjin.kwon] and [~viirya] any update on this issue? > Pyspark UDF wrongly changes timestamps to UTC > - > > Key: SPARK-33863 > URL: https://issues.apache.org/jira/browse/SPARK-33863 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.1 > Environment: MAC/Linux > Standalone cluster / local machine >Reporter: Nasir Ali >Priority: Major > > *Problem*: > I have a dataframe with a ts (timestamp) column in UTC. If I create a new > column using udf, pyspark udf wrongly changes timestamps into UTC time. ts > (timestamp) column is already in UTC time. Therefore, pyspark udf should not > convert ts (timestamp) column into UTC timestamp. > I have used following configs to let spark know the timestamps are in UTC: > > {code:java} > --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC > --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC > --conf spark.sql.session.timeZone=UTC > {code} > Below is a code snippet to reproduce the error: > > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql import functions as F > from pyspark.sql.types import StringType, TimestampType > import datetime > spark = SparkSession.builder.config("spark.sql.session.timeZone", > "UTC").getOrCreate() > df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), > ("usr1",13.00, "2018-02-11T12:27:18+00:00"), > ("usr1",25.00, "2018-02-12T11:27:18+00:00"), > ("usr1",20.00, "2018-02-13T15:27:18+00:00"), > ("usr1",17.00, "2018-02-14T12:27:18+00:00"), > ("usr2",99.00, "2018-02-15T11:27:18+00:00"), > ("usr2",156.00, "2018-02-22T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.show(truncate=False) > def some_time_udf(i): > if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): > tmp= "Morning: " + str(i) > elif datetime.time(12, 0)<=i.time() < datetime.time(17, 0): > tmp= "Afternoon: " + str(i) > elif datetime.time(17, 0)<=i.time() < datetime.time(21, 0): > tmp= "Evening" > elif datetime.time(21, 0)<=i.time() < datetime.time(0, 0): > tmp= "Night" > elif datetime.time(0, 0)<=i.time() < datetime.time(5, 0): > tmp= "Night" > return tmp > udf = F.udf(some_time_udf,StringType()) > df.withColumn("day_part", udf(df.ts)).show(truncate=False) > {code} > > Below is the output of the above code: > {code:java} > ++-+---++ > |user|id |ts |day_part| > ++-+---++ > |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18| > |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18| > |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18| > |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18| > |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18| > |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18| > |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18| > ++-+---++ > {code} > Above output is incorrect. You can see ts and day_part columns don't have > same timestamps. Below is the output I would expect: > > {code:java} > ++-+---++ > |user|id |ts |day_part| > ++-+---++ > |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18| > |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18| > |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18| > |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18| > |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18| > |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18| > |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18| > ++-+---++{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-33863: -- Description: *Problem*: I have a dataframe with a ts (timestamp) column in UTC. If I create a new column using udf, pyspark udf wrongly changes timestamps into UTC time. ts (timestamp) column is already in UTC time. Therefore, pyspark udf should not convert ts (timestamp) column into UTC timestamp. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType, TimestampType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate() df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), ("usr1",13.00, "2018-02-11T12:27:18+00:00"), ("usr1",25.00, "2018-02-12T11:27:18+00:00"), ("usr1",20.00, "2018-02-13T15:27:18+00:00"), ("usr1",17.00, "2018-02-14T12:27:18+00:00"), ("usr2",99.00, "2018-02-15T11:27:18+00:00"), ("usr2",156.00, "2018-02-22T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.show(truncate=False) def some_time_udf(i): if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): tmp= "Morning: " + str(i) elif datetime.time(12, 0)<=i.time() < datetime.time(17, 0): tmp= "Afternoon: " + str(i) elif datetime.time(17, 0)<=i.time() < datetime.time(21, 0): tmp= "Evening" elif datetime.time(21, 0)<=i.time() < datetime.time(0, 0): tmp= "Night" elif datetime.time(0, 0)<=i.time() < datetime.time(5, 0): tmp= "Night" return tmp udf = F.udf(some_time_udf,StringType()) df.withColumn("day_part", udf(df.ts)).show(truncate=False) {code} Below is the output of the above code: {code:java} ++-+---++ |user|id |ts |day_part| ++-+---++ |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18| |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18| |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18| |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18| |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18| |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18| |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18| ++-+---++ {code} Above output is incorrect. You can see ts and day_part columns don't have same timestamps. Below is the output I would expect: {code:java} ++-+---++ |user|id |ts |day_part| ++-+---++ |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18| |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18| |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18| |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18| |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18| |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18| |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18| ++-+---++{code} was: *Problem*: I have a dataframe with a ts (timestamp) column in UTC. If I create a new column using udf, pyspark udf wrongly changes timestamps into UTC time. ts (timestamp) column is already in UTC time. Therefore, pyspark udf should not convert ts (timestamp) column into UTC timestamp. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType, TimestampType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate() df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), ("usr1",13.00, "2018-02-11T12:27:18+00:00"), ("usr1",25.00, "2018-02-12T11:27:18+00:00"),
[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-33863: -- Description: *Problem*: I have a dataframe with a ts (timestamp) column in UTC. If I create a new column using udf, pyspark udf wrongly changes timestamps into UTC time. ts (timestamp) column is already in UTC time. Therefore, pyspark udf should not convert ts (timestamp) column into UTC timestamp. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType, TimestampType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate() df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), ("usr1",13.00, "2018-02-11T12:27:18+00:00"), ("usr1",25.00, "2018-02-12T11:27:18+00:00"), ("usr1",20.00, "2018-02-13T15:27:18+00:00"), ("usr1",17.00, "2018-02-14T12:27:18+00:00"), ("usr2",99.00, "2018-02-15T11:27:18+00:00"), ("usr2",156.00, "2018-02-22T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.show(truncate=False) def some_time_udf(i): if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): tmp= "Morning: " + str(i) elif datetime.time(12, 0)<=i.time() < datetime.time(17, 0): tmp= str(i) elif datetime.time(17, 0)<=i.time() < datetime.time(21, 0): tmp= "Evening" elif datetime.time(21, 0)<=i.time() < datetime.time(0, 0): tmp= "Night" elif datetime.time(0, 0)<=i.time() < datetime.time(5, 0): tmp= "Night" return tmp udf = F.udf(some_time_udf,StringType()) df.withColumn("day_part", udf(df.ts)).show(truncate=False) {code} Below is the output of the above code: {code:java} ++-+---++ |user|id |ts |day_part| ++-+---++ |usr1|17.0 |2018-02-10 15:27:18|Morning: 2018-02-10 09:27:18| |usr1|13.0 |2018-02-11 12:27:18|Morning: 2018-02-11 06:27:18| |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 05:27:18| |usr1|20.0 |2018-02-13 15:27:18|Morning: 2018-02-13 09:27:18| |usr1|17.0 |2018-02-14 12:27:18|Morning: 2018-02-14 06:27:18| |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 05:27:18| |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 05:27:18| ++-+---++ {code} Above output is incorrect. You can see ts and day_part columns don't have same timestamps. Below is the output I would expect: {code:java} ++-+---++ |user|id |ts |day_part| ++-+---++ |usr1|17.0 |2018-02-10 15:27:18|Afternoon: 2018-02-10 15:27:18| |usr1|13.0 |2018-02-11 12:27:18|Afternoon: 2018-02-11 12:27:18| |usr1|25.0 |2018-02-12 11:27:18|Morning: 2018-02-12 11:27:18| |usr1|20.0 |2018-02-13 15:27:18|Afternoon: 2018-02-13 15:27:18| |usr1|17.0 |2018-02-14 12:27:18|Afternoon: 2018-02-14 12:27:18| |usr2|99.0 |2018-02-15 11:27:18|Morning: 2018-02-15 11:27:18| |usr2|156.0|2018-02-22 11:27:18|Morning: 2018-02-22 11:27:18| ++-+---++{code} was: *Problem*: I have a dataframe with a ts (timestamp) column in UTC. If I create a new column using udf, pyspark udf wrongly changes timestamps into UTC time. ts (timestamp) column is already in UTC time. Therefore, pyspark udf should not convert ts (timestamp) column into UTC timestamp. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType, TimestampType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate() df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), ("usr1",13.00, "2018-02-11T12:27:18+00:00"), ("usr1",25.00, "2018-02-12T11:27:18+00:00"),
[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-33863: -- Description: *Problem*: I have a dataframe with a ts (timestamp) column in UTC. If I create a new column using udf, pyspark udf wrongly changes timestamps into UTC time. ts (timestamp) column is already in UTC time. Therefore, pyspark udf should not convert ts (timestamp) column into UTC timestamp. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType, TimestampType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate() df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), ("usr1",13.00, "2018-02-11T12:27:18+00:00"), ("usr1",25.00, "2018-02-12T11:27:18+00:00"), ("usr1",20.00, "2018-02-13T15:27:18+00:00"), ("usr1",17.00, "2018-02-14T12:27:18+00:00"), ("usr2",99.00, "2018-02-15T11:27:18+00:00"), ("usr2",156.00, "2018-02-22T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.show(truncate=False) def some_time_udf(i): if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): tmp= "Morning: " + str(i) elif datetime.time(12, 0)<=i.time() < datetime.time(17, 0): tmp= str(i) elif datetime.time(17, 0)<=i.time() < datetime.time(21, 0): tmp= "Evening" elif datetime.time(21, 0)<=i.time() < datetime.time(0, 0): tmp= "Night" elif datetime.time(0, 0)<=i.time() < datetime.time(5, 0): tmp= "Night" return tmp udf = F.udf(some_time_udf,StringType()) df.withColumn("day_part", udf(df.ts)).show(truncate=False) {code} Below is the output of the above code: {code:java} ++-+---+---+ |user|id |ts |day_part | ++-+---+---+ |usr1|17.0 |2018-02-10 15:27:18|2018-02-10 09:27:18| |usr1|13.0 |2018-02-11 12:27:18|2018-02-11 06:27:18| |usr1|25.0 |2018-02-12 11:27:18|2018-02-12 05:27:18| |usr1|20.0 |2018-02-13 15:27:18|2018-02-13 09:27:18| |usr1|17.0 |2018-02-14 12:27:18|2018-02-14 06:27:18| |usr2|99.0 |2018-02-15 11:27:18|2018-02-15 05:27:18| |usr2|156.0|2018-02-22 11:27:18|2018-02-22 05:27:18| ++-+---+---+ {code} Above output is incorrect. You can see ts and day_part columns don't have same timestamps. Below is the output I would expect: {code:java} ++-+---+---+ |user|id |ts |day_part | ++-+---+---+ |usr1|17.0 |2018-02-10 15:27:18|2018-02-10 15:27:18| |usr1|13.0 |2018-02-11 12:27:18|2018-02-11 12:27:18| |usr1|25.0 |2018-02-12 11:27:18|2018-02-12 11:27:18| |usr1|20.0 |2018-02-13 15:27:18|2018-02-13 15:27:18| |usr1|17.0 |2018-02-14 12:27:18|2018-02-14 12:27:18| |usr2|99.0 |2018-02-15 11:27:18|2018-02-15 11:27:18| |usr2|156.0|2018-02-22 11:27:18|2018-02-22 11:27:18| ++-+---+---+{code} If I change return type to TimeStampType then 'day_part' will have correct timestamp. was: *Problem*: I have a dataframe with a ts (timestamp) column in UTC. If I create a new column using udf, pyspark udf wrongly changes timestamps into UTC time. ts (timestamp) column is already in UTC time. Therefore, pyspark udf should not convert ts (timestamp) column into UTC timestamp. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate() df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), ("usr1",13.00, "2018-02-11T12:27:18+00:00"), ("usr1",25.00, "2018-02-12T11:27:18+00:00"), ("usr1",20.00, "2018-02-13T15:27:18+00:00"), ("usr1",17.00, "2018-02-14T12:27:18+00:00"),
[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-33863: -- Description: *Problem*: I have a dataframe with a ts (timestamp) column in UTC. If I create a new column using udf, pyspark udf wrongly changes timestamps into UTC time. ts (timestamp) column is already in UTC time. Therefore, pyspark udf should not convert ts (timestamp) column into UTC timestamp. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate() df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), ("usr1",13.00, "2018-02-11T12:27:18+00:00"), ("usr1",25.00, "2018-02-12T11:27:18+00:00"), ("usr1",20.00, "2018-02-13T15:27:18+00:00"), ("usr1",17.00, "2018-02-14T12:27:18+00:00"), ("usr2",99.00, "2018-02-15T11:27:18+00:00"), ("usr2",156.00, "2018-02-22T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.show(truncate=False) def some_time_udf(i): return str(i) udf = F.udf(some_time_udf,StringType()) df.withColumn("day_part", udf(df.ts)).show(truncate=False) {code} Below is the output of the above code: {code:java} ++-+---+---+ |user|id |ts |day_part | ++-+---+---+ |usr1|17.0 |2018-02-10 15:27:18|2018-02-10 09:27:18| |usr1|13.0 |2018-02-11 12:27:18|2018-02-11 06:27:18| |usr1|25.0 |2018-02-12 11:27:18|2018-02-12 05:27:18| |usr1|20.0 |2018-02-13 15:27:18|2018-02-13 09:27:18| |usr1|17.0 |2018-02-14 12:27:18|2018-02-14 06:27:18| |usr2|99.0 |2018-02-15 11:27:18|2018-02-15 05:27:18| |usr2|156.0|2018-02-22 11:27:18|2018-02-22 05:27:18| ++-+---+---+ {code} Above output is incorrect. You can see ts and day_part columns don't have same timestamps. Below is the output I would expect: {code:java} ++-+---+---+ |user|id |ts |day_part | ++-+---+---+ |usr1|17.0 |2018-02-10 15:27:18|2018-02-10 15:27:18| |usr1|13.0 |2018-02-11 12:27:18|2018-02-11 12:27:18| |usr1|25.0 |2018-02-12 11:27:18|2018-02-12 11:27:18| |usr1|20.0 |2018-02-13 15:27:18|2018-02-13 15:27:18| |usr1|17.0 |2018-02-14 12:27:18|2018-02-14 12:27:18| |usr2|99.0 |2018-02-15 11:27:18|2018-02-15 11:27:18| |usr2|156.0|2018-02-22 11:27:18|2018-02-22 11:27:18| ++-+---+---+{code} If I change return type to TimeStampType then 'day_part' will have correct timestamp. was: *Problem*: I have a dataframe with a ts (timestamp) column in UTC. If I create a new column using udf, pyspark udf wrongly changes timestamps into UTC time. ts (timestamp) column is already in UTC time. Therefore, pyspark udf should not convert ts (timestamp) column into UTC timestamp. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate() df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), ("usr1",13.00, "2018-02-11T12:27:18+00:00"), ("usr1",25.00, "2018-02-12T11:27:18+00:00"), ("usr1",20.00, "2018-02-13T15:27:18+00:00"), ("usr1",17.00, "2018-02-14T12:27:18+00:00"), ("usr2",99.00, "2018-02-15T11:27:18+00:00"), ("usr2",156.00, "2018-02-22T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.show(truncate=False) def some_time_udf(i): return str(i) udf = F.udf(some_time_udf,StringType()) df.withColumn("day_part", udf(df.ts)).show(truncate=False) {code} Below is the output of
[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-33863: -- Description: *Problem*: I have a dataframe with a ts (timestamp) column in UTC. If I create a new column using udf, pyspark udf wrongly changes timestamps into UTC time. ts (timestamp) column is already in UTC time. Therefore, pyspark udf should not convert ts (timestamp) column into UTC timestamp. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate() df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), ("usr1",13.00, "2018-02-11T12:27:18+00:00"), ("usr1",25.00, "2018-02-12T11:27:18+00:00"), ("usr1",20.00, "2018-02-13T15:27:18+00:00"), ("usr1",17.00, "2018-02-14T12:27:18+00:00"), ("usr2",99.00, "2018-02-15T11:27:18+00:00"), ("usr2",156.00, "2018-02-22T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.show(truncate=False) def some_time_udf(i): return str(i) udf = F.udf(some_time_udf,StringType()) df.withColumn("day_part", udf(df.ts)).show(truncate=False) {code} Below is the output of the above code: {code:java} ++-+---+---+ |user|id |ts |day_part | ++-+---+---+ |usr1|17.0 |2018-02-10 15:27:18|2018-02-10 09:27:18| |usr1|13.0 |2018-02-11 12:27:18|2018-02-11 06:27:18| |usr1|25.0 |2018-02-12 11:27:18|2018-02-12 05:27:18| |usr1|20.0 |2018-02-13 15:27:18|2018-02-13 09:27:18| |usr1|17.0 |2018-02-14 12:27:18|2018-02-14 06:27:18| |usr2|99.0 |2018-02-15 11:27:18|2018-02-15 05:27:18| |usr2|156.0|2018-02-22 11:27:18|2018-02-22 05:27:18| ++-+---+---+ {code} Above output is incorrect. You can see ts and day_part columns don't have same timestamps. Below is the output I would expect: {code:java} ++-+---+---+ |user|id |ts |day_part | ++-+---+---+ |usr1|17.0 |2018-02-10 15:27:18|2018-02-10 15:27:18| |usr1|13.0 |2018-02-11 12:27:18|2018-02-11 12:27:18| |usr1|25.0 |2018-02-12 11:27:18|2018-02-12 11:27:18| |usr1|20.0 |2018-02-13 15:27:18|2018-02-13 15:27:18| |usr1|17.0 |2018-02-14 12:27:18|2018-02-14 12:27:18| |usr2|99.0 |2018-02-15 11:27:18|2018-02-15 11:27:18| |usr2|156.0|2018-02-22 11:27:18|2018-02-22 11:27:18| ++-+---+---+ {code} was: *Problem*: I have a dataframe with a ts (timestamp) column in UTC. If I create a new column using udf, pyspark udf wrongly changes timestamps into UTC time. ts (timestamp) column is already in UTC time. Therefore, pyspark udf should not convert ts (timestamp) column into UTC timestamp. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate()df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), ("usr1",13.00, "2018-02-11T12:27:18+00:00"), ("usr1",25.00, "2018-02-12T11:27:18+00:00"), ("usr1",20.00, "2018-02-13T15:27:18+00:00"), ("usr1",17.00, "2018-02-14T12:27:18+00:00"), ("usr2",99.00, "2018-02-15T11:27:18+00:00"), ("usr2",156.00, "2018-02-22T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.show(truncate=False)def some_time_udf(i): return str(i)udf = F.udf(some_time_udf,StringType()) df.withColumn("day_part", udf(df.ts)).show(truncate=False) {code} Below is the output of the above code: {code:java} ++-+---+---+ |user|id |ts
[jira] [Commented] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17260911#comment-17260911 ] Nasir Ali commented on SPARK-33863: --- [~hyukjin.kwon] and [~viirya] I have simplified example code, added/revised details, and also added output and expected output. Please let me know if you need more information. > Pyspark UDF wrongly changes timestamps to UTC > - > > Key: SPARK-33863 > URL: https://issues.apache.org/jira/browse/SPARK-33863 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.1 > Environment: MAC/Linux > Standalone cluster / local machine >Reporter: Nasir Ali >Priority: Major > > *Problem*: > I have a dataframe with a ts (timestamp) column in UTC. If I create a new > column using udf, pyspark udf wrongly changes timestamps into UTC time. ts > (timestamp) column is already in UTC time. Therefore, pyspark udf should not > convert ts (timestamp) column into UTC timestamp. > I have used following configs to let spark know the timestamps are in UTC: > > {code:java} > --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC > --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC > --conf spark.sql.session.timeZone=UTC > {code} > Below is a code snippet to reproduce the error: > > {code:java} > from pyspark.sql import SparkSession > from pyspark.sql import functions as F > from pyspark.sql.types import StringType > import datetime > spark = SparkSession.builder.config("spark.sql.session.timeZone", > "UTC").getOrCreate()df = spark.createDataFrame([("usr1",17.00, > "2018-02-10T15:27:18+00:00"), > ("usr1",13.00, "2018-02-11T12:27:18+00:00"), > ("usr1",25.00, "2018-02-12T11:27:18+00:00"), > ("usr1",20.00, "2018-02-13T15:27:18+00:00"), > ("usr1",17.00, "2018-02-14T12:27:18+00:00"), > ("usr2",99.00, "2018-02-15T11:27:18+00:00"), > ("usr2",156.00, "2018-02-22T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.show(truncate=False)def some_time_udf(i): > return str(i)udf = F.udf(some_time_udf,StringType()) > df.withColumn("day_part", udf(df.ts)).show(truncate=False) > {code} > > Below is the output of the above code: > {code:java} > ++-+---+---+ > |user|id |ts |day_part | > ++-+---+---+ > |usr1|17.0 |2018-02-10 15:27:18|2018-02-10 09:27:18| > |usr1|13.0 |2018-02-11 12:27:18|2018-02-11 06:27:18| > |usr1|25.0 |2018-02-12 11:27:18|2018-02-12 05:27:18| > |usr1|20.0 |2018-02-13 15:27:18|2018-02-13 09:27:18| > |usr1|17.0 |2018-02-14 12:27:18|2018-02-14 06:27:18| > |usr2|99.0 |2018-02-15 11:27:18|2018-02-15 05:27:18| > |usr2|156.0|2018-02-22 11:27:18|2018-02-22 05:27:18| > ++-+---+---+ > {code} > Above output is incorrect. You can see ts and day_part columns don't have > same timestamps. Below is the output I would expect: > {code:java} > ++-+---+---+ > |user|id |ts |day_part | > ++-+---+---+ > |usr1|17.0 |2018-02-10 15:27:18|2018-02-10 15:27:18| > |usr1|13.0 |2018-02-11 12:27:18|2018-02-11 12:27:18| > |usr1|25.0 |2018-02-12 11:27:18|2018-02-12 11:27:18| > |usr1|20.0 |2018-02-13 15:27:18|2018-02-13 15:27:18| > |usr1|17.0 |2018-02-14 12:27:18|2018-02-14 12:27:18| > |usr2|99.0 |2018-02-15 11:27:18|2018-02-15 11:27:18| > |usr2|156.0|2018-02-22 11:27:18|2018-02-22 11:27:18| > ++-+---+---+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33863) Pyspark UDF wrongly changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-33863: -- Description: *Problem*: I have a dataframe with a ts (timestamp) column in UTC. If I create a new column using udf, pyspark udf wrongly changes timestamps into UTC time. ts (timestamp) column is already in UTC time. Therefore, pyspark udf should not convert ts (timestamp) column into UTC timestamp. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate()df = spark.createDataFrame([("usr1",17.00, "2018-02-10T15:27:18+00:00"), ("usr1",13.00, "2018-02-11T12:27:18+00:00"), ("usr1",25.00, "2018-02-12T11:27:18+00:00"), ("usr1",20.00, "2018-02-13T15:27:18+00:00"), ("usr1",17.00, "2018-02-14T12:27:18+00:00"), ("usr2",99.00, "2018-02-15T11:27:18+00:00"), ("usr2",156.00, "2018-02-22T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.show(truncate=False)def some_time_udf(i): return str(i)udf = F.udf(some_time_udf,StringType()) df.withColumn("day_part", udf(df.ts)).show(truncate=False) {code} Below is the output of the above code: {code:java} ++-+---+---+ |user|id |ts |day_part | ++-+---+---+ |usr1|17.0 |2018-02-10 15:27:18|2018-02-10 09:27:18| |usr1|13.0 |2018-02-11 12:27:18|2018-02-11 06:27:18| |usr1|25.0 |2018-02-12 11:27:18|2018-02-12 05:27:18| |usr1|20.0 |2018-02-13 15:27:18|2018-02-13 09:27:18| |usr1|17.0 |2018-02-14 12:27:18|2018-02-14 06:27:18| |usr2|99.0 |2018-02-15 11:27:18|2018-02-15 05:27:18| |usr2|156.0|2018-02-22 11:27:18|2018-02-22 05:27:18| ++-+---+---+ {code} Above output is incorrect. You can see ts and day_part columns don't have same timestamps. Below is the output I would expect: {code:java} ++-+---+---+ |user|id |ts |day_part | ++-+---+---+ |usr1|17.0 |2018-02-10 15:27:18|2018-02-10 15:27:18| |usr1|13.0 |2018-02-11 12:27:18|2018-02-11 12:27:18| |usr1|25.0 |2018-02-12 11:27:18|2018-02-12 11:27:18| |usr1|20.0 |2018-02-13 15:27:18|2018-02-13 15:27:18| |usr1|17.0 |2018-02-14 12:27:18|2018-02-14 12:27:18| |usr2|99.0 |2018-02-15 11:27:18|2018-02-15 11:27:18| |usr2|156.0|2018-02-22 11:27:18|2018-02-22 11:27:18| ++-+---+---+ {code} was: *Problem*: If I create a new column using udf, pyspark udf changes timestamps into UTC time. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate()df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.show(truncate=False)def some_time_udf(i): tmp="" if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): tmp="Morning -> "+str(i) return tmpudf = F.udf(some_time_udf,StringType()) df.withColumn("day_part", udf(df.ts)).show(truncate=False) {code} I
[jira] [Updated] (SPARK-33863) Pyspark UDF changes timestamps to UTC
[ https://issues.apache.org/jira/browse/SPARK-33863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-33863: -- Description: *Problem*: If I create a new column using udf, pyspark udf changes timestamps into UTC time. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate()df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.show(truncate=False)def some_time_udf(i): tmp="" if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): tmp="Morning -> "+str(i) return tmpudf = F.udf(some_time_udf,StringType()) df.withColumn("day_part", udf(df.ts)).show(truncate=False) {code} I have concatenated timestamps with the string to show that pyspark pass timestamps as UTC. was: *Problem*: If I create a new column using udf, pyspark udf changes timestamps into UTC time. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate() df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.show(truncate=False) def some_time_udf(i): tmp="" if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): tmp="Morning -> "+str(i) elif datetime.time(12, 0)<=i.time() < datetime.time(17, 0): tmp= "Afternoon -> "+str(i) elif datetime.time(17, 0)<=i.time() < datetime.time(21, 0): tmp= "Evening -> "+str(i) elif datetime.time(21, 0)<=i.time() < datetime.time(0, 0): tmp= "Night -> "+str(i) elif datetime.time(0, 0)<=i.time() < datetime.time(5, 0): tmp= "Night -> "+str(i) return tmpsometimeudf = F.udf(some_time_udf,StringType())df.withColumn("day_part", sometimeudf("ts")).show(truncate=False) {code} I have concatenated timestamps with the string to show that pyspark pass timestamps as UTC. > Pyspark UDF changes timestamps to UTC > - > > Key: SPARK-33863 > URL: https://issues.apache.org/jira/browse/SPARK-33863 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 3.0.1 > Environment: MAC/Linux > Standalone cluster / local machine >Reporter: Nasir Ali >Priority: Major > > *Problem*: > If I create a new column using udf, pyspark udf changes timestamps into UTC > time. I have used following configs to let spark know the timestamps are in > UTC: > > {code:java} > --conf
[jira] [Created] (SPARK-33863) Pyspark UDF changes timestamps to UTC
Nasir Ali created SPARK-33863: - Summary: Pyspark UDF changes timestamps to UTC Key: SPARK-33863 URL: https://issues.apache.org/jira/browse/SPARK-33863 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 3.0.1 Environment: MAC/Linux Standalone cluster / local machine Reporter: Nasir Ali *Problem*: If I create a new column using udf, pyspark udf changes timestamps into UTC time. I have used following configs to let spark know the timestamps are in UTC: {code:java} --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC --conf spark.sql.session.timeZone=UTC {code} Below is a code snippet to reproduce the error: {code:java} from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType import datetime spark = SparkSession.builder.config("spark.sql.session.timeZone", "UTC").getOrCreate() df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.show(truncate=False) def some_time_udf(i): tmp="" if datetime.time(5, 0)<=i.time() < datetime.time(12, 0): tmp="Morning -> "+str(i) elif datetime.time(12, 0)<=i.time() < datetime.time(17, 0): tmp= "Afternoon -> "+str(i) elif datetime.time(17, 0)<=i.time() < datetime.time(21, 0): tmp= "Evening -> "+str(i) elif datetime.time(21, 0)<=i.time() < datetime.time(0, 0): tmp= "Night -> "+str(i) elif datetime.time(0, 0)<=i.time() < datetime.time(5, 0): tmp= "Night -> "+str(i) return tmpsometimeudf = F.udf(some_time_udf,StringType())df.withColumn("day_part", sometimeudf("ts")).show(truncate=False) {code} I have concatenated timestamps with the string to show that pyspark pass timestamps as UTC. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-30162) Add PushedFilters to metadata in Parquet DSv2 implementation
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089034#comment-17089034 ] Nasir Ali commented on SPARK-30162: --- [~dongjoon] This bug has not been fixed. Please have a look at my previous comment and uploaded screenshots. Yes you added logs to debug but it doesn't perform filtering on partition key yet. > Add PushedFilters to metadata in Parquet DSv2 implementation > > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 >Reporter: Nasir Ali >Assignee: Hyukjin Kwon >Priority: Minor > Attachments: Screenshot from 2020-01-01 21-01-18.png, Screenshot from > 2020-01-01 21-01-32.png > > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = > spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) >+- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and > it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't > work. > > {code:java} > // pyspark 3 shell output > $ pyspark > Python 3.6.8 (default, Aug 7 2019, 17:28:10) > [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux > Type "help", "copyright", "credits" or "license" for more information. > Warning: Ignoring non-spark config property: > java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8 > 19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 19/12/09 07:05:36 WARN SparkConf: Note that
[jira] [Comment Edited] (SPARK-30162) Add PushedFilters to metadata in Parquet DSv2 implementation
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006548#comment-17006548 ] Nasir Ali edited comment on SPARK-30162 at 1/2/20 3:05 AM: --- This issue has not been fixed. It shows pushed filters but it does not actually push filters. For example, look at 2.4.* output; there it clearly shows 'user' as partition filter but not in 3.* version. Have a look at spark ui sql graph. On a bigger dataset, spark 2.4.* takes less than a second to show one row. Whereas, spark 3.* takes way too long to produce the same result. Code is exactly same (load parquet, filter based on partition key user) was (Author: nasirali): This issue has not been fixed. It shows pushed filters but it does not actually push filters. For example, look at 2.4.* output; there it clearly shows 'user' as partition filter but not in 3.* version. Have a look at spark ui sql graph. On a bigger dataset, spark 2.4.* takes less than a second to show one row. Whereas, spark 3.* takes 10+ seconds to produce the same result. Code is exactly same (load parquet, filter based on partition key user) > Add PushedFilters to metadata in Parquet DSv2 implementation > > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 >Reporter: Nasir Ali >Assignee: Hyukjin Kwon >Priority: Minor > Fix For: 3.0.0 > > Attachments: Screenshot from 2020-01-01 21-01-18.png, Screenshot from > 2020-01-01 21-01-32.png > > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = > spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) >+- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and > it's hard to debug.
[jira] [Comment Edited] (SPARK-30162) Add PushedFilters to metadata in Parquet DSv2 implementation
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006548#comment-17006548 ] Nasir Ali edited comment on SPARK-30162 at 1/2/20 3:04 AM: --- This issue has not been fixed. It shows pushed filters but it does not actually push filters. For example, look at 2.4.* output; there it clearly shows 'user' as partition filter but not in 3.* version. Have a look at spark ui sql graph. On a bigger dataset, spark 2.4.* takes less than a second to show one row. Whereas, spark 3.* takes 10+ seconds to produce the same result. Code is exactly same (load parquet, filter based on partition key user) was (Author: nasirali): This issue has not been fixed. It shows pushed filters but it does not actually push filters. For example, look at 2.4.* output; there it clearly shows 'user' as partition filter but not in 3.* version. > Add PushedFilters to metadata in Parquet DSv2 implementation > > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 >Reporter: Nasir Ali >Assignee: Hyukjin Kwon >Priority: Minor > Fix For: 3.0.0 > > Attachments: Screenshot from 2020-01-01 21-01-18.png, Screenshot from > 2020-01-01 21-01-32.png > > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = > spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) >+- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and > it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't > work. > > {code:java} > // pyspark 3 shell output > $ pyspark > Python 3.6.8 (default, Aug 7 2019, 17:28:10) > [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux > Type "help", "copyright", "credits" or "license" for more information.
[jira] [Updated] (SPARK-30162) Add PushedFilters to metadata in Parquet DSv2 implementation
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-30162: -- Attachment: Screenshot from 2020-01-01 21-01-32.png Screenshot from 2020-01-01 21-01-18.png > Add PushedFilters to metadata in Parquet DSv2 implementation > > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 >Reporter: Nasir Ali >Assignee: Hyukjin Kwon >Priority: Minor > Fix For: 3.0.0 > > Attachments: Screenshot from 2020-01-01 21-01-18.png, Screenshot from > 2020-01-01 21-01-32.png > > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = > spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) >+- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and > it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't > work. > > {code:java} > // pyspark 3 shell output > $ pyspark > Python 3.6.8 (default, Aug 7 2019, 17:28:10) > [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux > Type "help", "copyright", "credits" or "license" for more information. > Warning: Ignoring non-spark config property: > java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8 > 19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 19/12/09 07:05:36 WARN SparkConf: Note that spark.local.dir will be > overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS
[jira] [Reopened] (SPARK-30162) Add PushedFilters to metadata in Parquet DSv2 implementation
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali reopened SPARK-30162: --- This issue has not been fixed. It shows pushed filters but it does not actually push filters. For example, look at 2.4.* output; there it clearly shows 'user' as partition filter but not in 3.* version. > Add PushedFilters to metadata in Parquet DSv2 implementation > > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 >Reporter: Nasir Ali >Assignee: Hyukjin Kwon >Priority: Minor > Fix For: 3.0.0 > > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = > spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) >+- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and > it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't > work. > > {code:java} > // pyspark 3 shell output > $ pyspark > Python 3.6.8 (default, Aug 7 2019, 17:28:10) > [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux > Type "help", "copyright", "credits" or "license" for more information. > Warning: Ignoring non-spark config property: > java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8 > 19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 19/12/09 07:05:36 WARN SparkConf: Note that spark.local.dir will be > overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in >
[jira] [Comment Edited] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995815#comment-16995815 ] Nasir Ali edited comment on SPARK-28502 at 12/13/19 6:45 PM: - {code:java} import numpy as np import pandas as pd import json from geopy.distance import great_circle from pyspark.sql.functions import pandas_udf, PandasUDFType from shapely.geometry.multipoint import MultiPoint from sklearn.cluster import DBSCAN from pyspark.sql.types import StructField, StructType, StringType, FloatType, MapType from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType, IntegerType,DateType,TimestampTypeschema = StructType([ StructField("timestamp", TimestampType()), StructField("window", StructType([ StructField("start", TimestampType()), StructField("end", TimestampType())])), StructField("some_val", StringType()) ])@pandas_udf(schema, PandasUDFType.GROUPED_MAP) def get_win_col(key, user_data): all_vals = [] for index, row in user_data.iterrows(): all_vals.append([row["timestamp"],key[2],"tesss"]) return pd.DataFrame(all_vals,columns=['timestamp','window','some_val']) {code} I am not even able to manually return window column. It throws error {code:java} Traceback (most recent call last): File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 139, in returnType to_arrow_type(self._returnType_placeholder) File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/types.py", line 1641, in to_arrow_type raise TypeError("Nested StructType not supported in conversion to Arrow") TypeError: Nested StructType not supported in conversion to ArrowDuring handling of the above exception, another exception occurred:Traceback (most recent call last): File "", line 1, in File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 79, in _create_udf return udf_obj._wrapped() File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 234, in _wrapped wrapper.returnType = self.returnType File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 143, in returnType "%s is not supported" % str(self._returnType_placeholder)) NotImplementedError: Invalid returnType with grouped map Pandas UDFs: StructType(List(StructField(timestamp,TimestampType,true),StructField(window,StructType(List(StructField(start,TimestampType,true),StructField(end,TimestampType,true))),true),StructField(some_val,StringType,true))) is not supported {code} However, if I manually run *to_arrow_schema(schema)*. It works all fine and there is no exception. [https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L139] {code:java} from pyspark.sql.types import to_arrow_schema to_arrow_schema(schema) {code} was (Author: nasirali): {code:java} import numpy as np import pandas as pd import json from geopy.distance import great_circle from pyspark.sql.functions import pandas_udf, PandasUDFType from shapely.geometry.multipoint import MultiPoint from sklearn.cluster import DBSCAN from pyspark.sql.types import StructField, StructType, StringType, FloatType, MapType from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType, IntegerType,DateType,TimestampTypeschema = StructType([ StructField("timestamp", TimestampType()), StructField("window", StructType([ StructField("start", TimestampType()), StructField("end", TimestampType())])), StructField("some_val", StringType()) ])@pandas_udf(schema, PandasUDFType.GROUPED_MAP) def get_win_col(key, user_data): all_vals = [] for index, row in user_data.iterrows(): all_vals.append([row["timestamp"],key[2],"tesss"]) return pd.DataFrame(all_vals,columns=['timestamp','window','some_val']) {code} I am not even able to manually return window column. It throws error {code:java} Traceback (most recent call last): File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 139, in returnType to_arrow_type(self._returnType_placeholder) File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/types.py", line 1641, in to_arrow_type raise TypeError("Nested StructType not supported in conversion to Arrow") TypeError: Nested StructType not supported in conversion to ArrowDuring handling of the above exception, another exception occurred:Traceback (most recent call last): File "", line 1, in File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 79, in _create_udf return udf_obj._wrapped() File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 234, in _wrapped wrapper.returnType = self.returnType File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 143, in returnType "%s is not supported" % str(self._returnType_placeholder)) NotImplementedError: Invalid returnType with grouped map
[jira] [Commented] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995815#comment-16995815 ] Nasir Ali commented on SPARK-28502: --- {code:java} import numpy as np import pandas as pd import json from geopy.distance import great_circle from pyspark.sql.functions import pandas_udf, PandasUDFType from shapely.geometry.multipoint import MultiPoint from sklearn.cluster import DBSCAN from pyspark.sql.types import StructField, StructType, StringType, FloatType, MapType from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType, IntegerType,DateType,TimestampTypeschema = StructType([ StructField("timestamp", TimestampType()), StructField("window", StructType([ StructField("start", TimestampType()), StructField("end", TimestampType())])), StructField("some_val", StringType()) ])@pandas_udf(schema, PandasUDFType.GROUPED_MAP) def get_win_col(key, user_data): all_vals = [] for index, row in user_data.iterrows(): all_vals.append([row["timestamp"],key[2],"tesss"]) return pd.DataFrame(all_vals,columns=['timestamp','window','some_val']) {code} I am not even able to manually return window column. It throws error {code:java} Traceback (most recent call last): File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 139, in returnType to_arrow_type(self._returnType_placeholder) File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/types.py", line 1641, in to_arrow_type raise TypeError("Nested StructType not supported in conversion to Arrow") TypeError: Nested StructType not supported in conversion to ArrowDuring handling of the above exception, another exception occurred:Traceback (most recent call last): File "", line 1, in File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 79, in _create_udf return udf_obj._wrapped() File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 234, in _wrapped wrapper.returnType = self.returnType File "/usr/local/spark-3.0.0-preview/python/pyspark/sql/udf.py", line 143, in returnType "%s is not supported" % str(self._returnType_placeholder)) NotImplementedError: Invalid returnType with grouped map Pandas UDFs: StructType(List(StructField(timestamp,TimestampType,true),StructField(window,StructType(List(StructField(start,TimestampType,true),StructField(end,TimestampType,true))),true),StructField(some_val,StringType,true))) is not supported {code} However, if I manually run *to_arrow_schema(schema)*. It works all fine and there is no exception. {code:java} from pyspark.sql.types import to_arrow_schema to_arrow_schema(schema) {code} > Error with struct conversion while using pandas_udf > --- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 >Reporter: Nasir Ali >Priority: Minor > Fix For: 3.0.0 > > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], >["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15
[jira] [Commented] (SPARK-22947) SPIP: as-of join in Spark SQL
[ https://issues.apache.org/jira/browse/SPARK-22947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16993981#comment-16993981 ] Nasir Ali commented on SPARK-22947: --- [~icexelloss] Is there any update on this issue? Or is there any alternate in Spark to perform such task now? > SPIP: as-of join in Spark SQL > - > > Key: SPARK-22947 > URL: https://issues.apache.org/jira/browse/SPARK-22947 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.1 >Reporter: Li Jin >Priority: Major > Attachments: SPIP_ as-of join in Spark SQL (1).pdf > > > h2. Background and Motivation > Time series analysis is one of the most common analysis on financial data. In > time series analysis, as-of join is a very common operation. Supporting as-of > join in Spark SQL will allow many use cases of using Spark SQL for time > series analysis. > As-of join is “join on time” with inexact time matching criteria. Various > library has implemented asof join or similar functionality: > Kdb: https://code.kx.com/wiki/Reference/aj > Pandas: > http://pandas.pydata.org/pandas-docs/version/0.19.0/merging.html#merging-merge-asof > R: This functionality is called “Last Observation Carried Forward” > https://www.rdocumentation.org/packages/zoo/versions/1.8-0/topics/na.locf > JuliaDB: http://juliadb.org/latest/api/joins.html#IndexedTables.asofjoin > Flint: https://github.com/twosigma/flint#temporal-join-functions > This proposal advocates introducing new API in Spark SQL to support as-of > join. > h2. Target Personas > Data scientists, data engineers > h2. Goals > * New API in Spark SQL that allows as-of join > * As-of join of multiple table (>2) should be performant, because it’s very > common that users need to join multiple data sources together for further > analysis. > * Define Distribution, Partitioning and shuffle strategy for ordered time > series data > h2. Non-Goals > These are out of scope for the existing SPIP, should be considered in future > SPIP as improvement to Spark’s time series analysis ability: > * Utilize partition information from data source, i.e, begin/end of each > partition to reduce sorting/shuffling > * Define API for user to implement asof join time spec in business calendar > (i.e. lookback one business day, this is very common in financial data > analysis because of market calendars) > * Support broadcast join > h2. Proposed API Changes > h3. TimeContext > TimeContext is an object that defines the time scope of the analysis, it has > begin time (inclusive) and end time (exclusive). User should be able to > change the time scope of the analysis (i.e, from one month to five year) by > just changing the TimeContext. > To Spark engine, TimeContext is a hint that: > can be used to repartition data for join > serve as a predicate that can be pushed down to storage layer > Time context is similar to filtering time by begin/end, the main difference > is that time context can be expanded based on the operation taken (see > example in as-of join). > Time context example: > {code:java} > TimeContext timeContext = TimeContext("20160101", "20170101") > {code} > h3. asofJoin > h4. User Case A (join without key) > Join two DataFrames on time, with one day lookback: > {code:java} > TimeContext timeContext = TimeContext("20160101", "20170101") > dfA = ... > dfB = ... > JoinSpec joinSpec = JoinSpec(timeContext).on("time").tolerance("-1day") > result = dfA.asofJoin(dfB, joinSpec) > {code} > Example input/output: > {code:java} > dfA: > time, quantity > 20160101, 100 > 20160102, 50 > 20160104, -50 > 20160105, 100 > dfB: > time, price > 20151231, 100.0 > 20160104, 105.0 > 20160105, 102.0 > output: > time, quantity, price > 20160101, 100, 100.0 > 20160102, 50, null > 20160104, -50, 105.0 > 20160105, 100, 102.0 > {code} > Note row (20160101, 100) of dfA is joined with (20151231, 100.0) of dfB. This > is an important illustration of the time context - it is able to expand the > context to 20151231 on dfB because of the 1 day lookback. > h4. Use Case B (join with key) > To join on time and another key (for instance, id), we use “by” to specify > the key. > {code:java} > TimeContext timeContext = TimeContext("20160101", "20170101") > dfA = ... > dfB = ... > JoinSpec joinSpec = > JoinSpec(timeContext).on("time").by("id").tolerance("-1day") > result = dfA.asofJoin(dfB, joinSpec) > {code} > Example input/output: > {code:java} > dfA: > time, id, quantity > 20160101, 1, 100 > 20160101, 2, 50 > 20160102, 1, -50 > 20160102, 2, 50 > dfB: > time, id, price > 20151231, 1, 100.0 > 20150102, 1, 105.0 > 20150102, 2, 195.0 > Output: > time, id, quantity, price > 20160101, 1, 100, 100.0 > 20160101, 2, 50, null > 20160102, 1, -50, 105.0 > 20160102, 2, 50, 195.0 > {code} > h2. Optional Design Sketch > h3. Implementation
[jira] [Commented] (SPARK-30162) Filter is not being pushed down for Parquet files
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991573#comment-16991573 ] Nasir Ali commented on SPARK-30162: --- [~aman_omer] added in my question > Filter is not being pushed down for Parquet files > - > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 >Reporter: Nasir Ali >Priority: Major > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = > spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) >+- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and > it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't > work. > > {code:java} > // pyspark 3 shell output > $ pyspark > Python 3.6.8 (default, Aug 7 2019, 17:28:10) > [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux > Type "help", "copyright", "credits" or "license" for more information. > Warning: Ignoring non-spark config property: > java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8 > 19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 19/12/09 07:05:36 WARN SparkConf: Note that spark.local.dir will be > overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in > mesos/standalone/kubernetes and LOCAL_DIRS in YARN). > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/__ / .__/\_,_/_/ /_/\_\ version 3.0.0-preview > /_/Using
[jira] [Updated] (SPARK-30162) Filter is not being pushed down for Parquet files
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-30162: -- Description: Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method is different. It is hard to debug in 3.0 whether filters were pushed down or not. Below code could reproduce the bug: {code:java} // code placeholder df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) {code} {code:java} // Spark 2.4 output == Parsed Logical Plan == 'Filter ('user = usr2) +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#40 = usr2) +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == Filter (isnotnull(user#40) && (user#40 = usr2)) +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], ReadSchema: struct{code} {code:java} // Spark 3.0.0-preview output == Parsed Logical Plan == 'Filter ('user = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#2 = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized Logical Plan == Filter (isnotnull(user#2) AND (user#2 = usr2)) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan == *(1) Project [id#0, ts#1, user#2] +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) +- *(1) ColumnarToRow +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: InMemoryFileIndex[file:/home/cnali/data], ReadSchema: struct {code} I have tested it on much larger dataset. Spark 3.0 tries to load whole data and then apply filter. Whereas Spark 2.4 push down the filter. Above output shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't work. {code:java} // pyspark 3 shell output $ pyspark Python 3.6.8 (default, Aug 7 2019, 17:28:10) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. Warning: Ignoring non-spark config property: java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8 19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/12/09 07:05:36 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN). Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.0.0-preview /_/Using Python version 3.6.8 (default, Aug 7 2019 17:28:10) SparkSession available as 'spark'. {code} {code:java} // pyspark 2.4.4 shell output pyspark Python 3.6.8 (default, Aug 7 2019, 17:28:10) [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux Type "help", "copyright", "credits" or "license" for more information. 2019-12-09 07:09:07 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ /
[jira] [Commented] (SPARK-30162) Filter is not being pushed down for Parquet files
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16990741#comment-16990741 ] Nasir Ali commented on SPARK-30162: --- No, I only use pyspark shell. > Filter is not being pushed down for Parquet files > - > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 >Reporter: Nasir Ali >Priority: Major > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = > spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) >+- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and > it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't > work. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-30162) Filter is not being pushed down for Parquet files
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-30162: -- Description: Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method is different. It is hard to debug in 3.0 whether filters were pushed down or not. Below code could reproduce the bug: {code:java} // code placeholder df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) {code} {code:java} // Spark 2.4 output == Parsed Logical Plan == 'Filter ('user = usr2) +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#40 = usr2) +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == Filter (isnotnull(user#40) && (user#40 = usr2)) +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], ReadSchema: struct{code} {code:java} // Spark 3.0.0-preview output == Parsed Logical Plan == 'Filter ('user = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#2 = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized Logical Plan == Filter (isnotnull(user#2) AND (user#2 = usr2)) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan == *(1) Project [id#0, ts#1, user#2] +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) +- *(1) ColumnarToRow +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: InMemoryFileIndex[file:/home/cnali/data], ReadSchema: struct {code} I have tested it on much larger dataset. Spark 3.0 tries to load whole data and then apply filter. Whereas Spark 2.4 push down the filter. Above output shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't work. was: Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method is different. It is hard to debug in 3.0 whether filters were pushed down or not. Below code could reproduce the bug: {code:java} // code placeholder df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.write.partitionBy("user").parquet("/home/nasir/data/")df2 = spark.read.load("/home/nasir/data/")df2.filter("user=='usr2'").explain(True) {code} {code:java} // Spark 2.4 output == Parsed Logical Plan == 'Filter ('user = usr2) +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#40 = usr2) +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == Filter (isnotnull(user#40) && (user#40 = usr2)) +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/cnali/data],
[jira] [Updated] (SPARK-30162) Filter is not being pushed down for Parquet files
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-30162: -- Description: Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method is different. It is hard to debug in 3.0 whether filters were pushed down or not. Below code could reproduce the bug: {code:java} // code placeholder df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.write.partitionBy("user").parquet("/home/nasir/data/")df2 = spark.read.load("/home/nasir/data/")df2.filter("user=='usr2'").explain(True) {code} {code:java} // Spark 2.4 output == Parsed Logical Plan == 'Filter ('user = usr2) +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#40 = usr2) +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == Filter (isnotnull(user#40) && (user#40 = usr2)) +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], ReadSchema: struct{code} {code:java} // Spark 3.0.0-preview output == Parsed Logical Plan == 'Filter ('user = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#2 = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized Logical Plan == Filter (isnotnull(user#2) AND (user#2 = usr2)) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan == *(1) Project [id#0, ts#1, user#2] +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) +- *(1) ColumnarToRow +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: InMemoryFileIndex[file:/home/cnali/data], ReadSchema: struct {code} I have tested it on much larger dataset. Spark 3.0 tries to load whole data and then apply filter. Whereas Spark 2.4 push down the filter. Above output shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't work. was: Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method is different. It is hard to debug in 3.0 whether filters were pushed down or not. Below code could reproduce the bug: {code:java} // code placeholder df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.write.partitionBy("user").parquet("/home/nasir/data/")df2 = spark.read.load("/home/nasir/data/")df2.filter("user=='usr2'").explain(True) {code} {code:java} // Spark 2.4 output == Parsed Logical Plan == 'Filter ('user = usr2) +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#40 = usr2) +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == Filter (isnotnull(user#40) && (user#40 = usr2)) +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/cnali/data],
[jira] [Updated] (SPARK-30162) Filter is not being pushed down for Parquet files
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-30162: -- Description: Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method is different. It is hard to debug in 3.0 whether filters were pushed down or not. Below code could reproduce the bug: {code:java} // code placeholder df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.write.partitionBy("user").parquet("/home/nasir/data/")df2 = spark.read.load("/home/nasir/data/")df2.filter("user=='usr2'").explain(True) {code} {code:java} // Spark 2.4 output == Parsed Logical Plan == 'Filter ('user = usr2) +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#40 = usr2) +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == Filter (isnotnull(user#40) && (user#40 = usr2)) +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], ReadSchema: struct{code} {code:java} // Spark 3.0.0-preview output == Parsed Logical Plan == 'Filter ('user = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#2 = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized Logical Plan == Filter (isnotnull(user#2) AND (user#2 = usr2)) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan == *(1) Project [id#0, ts#1, user#2] +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) +- *(1) ColumnarToRow +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: InMemoryFileIndex[file:/home/cnali/data], ReadSchema: struct {code} I have tested it on much larger dataset. Spark 3.0 tries to load whole data and then apply filter. Whereas Spark 2.4 push down the filter. Above output shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. Minor: in Spark 3.0 output is truncated (maybe fixed length?) and it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't work. was: Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method is different. It is hard to debug in 3.0 whether filters were pushed down or not. Below code could reproduce the bug: {code:java} // code placeholder df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) {code} {code:java} // Spark 2.4 output == Parsed Logical Plan == 'Filter ('user = usr2) +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#40 = usr2) +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == Filter (isnotnull(user#40) && (user#40 = usr2)) +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1,
[jira] [Updated] (SPARK-30162) Filter is not being pushed down for Parquet files
[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-30162: -- Summary: Filter is not being pushed down for Parquet files (was: Filters is not being pushed down for Parquet files) > Filter is not being pushed down for Parquet files > - > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 >Reporter: Nasir Ali >Priority: Major > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], >["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = > spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) >+- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 output is truncated (maybe fixed length?) and it's hard > to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't work. > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-30162) Filters is not being pushed down for Parquet files
Nasir Ali created SPARK-30162: - Summary: Filters is not being pushed down for Parquet files Key: SPARK-30162 URL: https://issues.apache.org/jira/browse/SPARK-30162 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Environment: pyspark 3.0 preview Ubuntu/Centos pyarrow 0.14.1 Reporter: Nasir Ali Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method is different. It is hard to debug in 3.0 whether filters were pushed down or not. Below code could reproduce the bug: {code:java} // code placeholder df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) {code} {code:java} // Spark 2.4 output == Parsed Logical Plan == 'Filter ('user = usr2) +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#40 = usr2) +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == Filter (isnotnull(user#40) && (user#40 = usr2)) +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], ReadSchema: struct{code} {code:java} // Spark 3.0.0-preview output == Parsed Logical Plan == 'Filter ('user = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#2 = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized Logical Plan == Filter (isnotnull(user#2) AND (user#2 = usr2)) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan == *(1) Project [id#0, ts#1, user#2] +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) +- *(1) ColumnarToRow +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: InMemoryFileIndex[file:/home/cnali/data], ReadSchema: struct {code} I have tested it on much larger dataset. Spark 3.0 tries to load whole data and then apply filter. Whereas Spark 2.4 push down the filter. Above output shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. Minor: in Spark 3.0 output is truncated (maybe fixed length?) and it's hard to debug. spark.sql.orc.cache.stripe.details.size=1 doesn't work. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16968703#comment-16968703 ] Nasir Ali edited comment on SPARK-28502 at 11/6/19 9:17 PM: [~bryanc] If I perform any agg (e.g., _df.groupBy("id",F.window("ts", "15 days")).agg(\{"id":"avg"}).show()_ ) on grouped data, pyspark returns me the key (e.g., id, window) with the avg for each group. However, in the above example, when udf returns the struct, it does not automatically return the key. I have to manually add window to returning dataframe. Is there a way to automatically concatenate results of udf? was (Author: nasirali): [~bryanc] If I perform any agg (e.g., avg) on grouped data, pyspark returns me the key (e.g., window etc.) with the avg for each row. However, in the above example, when udf returns the struct, it does not automatically return the key. I have to manually add window to returning dataframe. Is there a way to automatically concatenate results of udf with key? > Error with struct conversion while using pandas_udf > --- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 >Reporter: Nasir Ali >Priority: Minor > Fix For: 3.0.0 > > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], >["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) > {code} > Output > {code:java} > +-+--+---+ > |id |window|avg(id)| > +-+--+---+ > |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | > |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | > |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | > |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | > |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | > |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | > |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | > +-+--+---+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16968703#comment-16968703 ] Nasir Ali commented on SPARK-28502: --- [~bryanc] If I perform any agg (e.g., avg) on grouped data, pyspark returns me the key (e.g., window etc.) with the avg for each row. However, in the above example, when udf returns the struct, it does not automatically return the key. I have to manually add window to returning dataframe. Is there a way to automatically concatenate results of udf with key? > Error with struct conversion while using pandas_udf > --- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 >Reporter: Nasir Ali >Priority: Minor > Fix For: 3.0.0 > > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], >["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) > {code} > Output > {code:java} > +-+--+---+ > |id |window|avg(id)| > +-+--+---+ > |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | > |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | > |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | > |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | > |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | > |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | > |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | > +-+--+---+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali reopened SPARK-28502: --- > Error with struct conversion while using pandas_udf > --- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 >Reporter: Nasir Ali >Priority: Minor > Fix For: 3.0.0 > > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], >["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) > {code} > Output > {code:java} > +-+--+---+ > |id |window|avg(id)| > +-+--+---+ > |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | > |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | > |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | > |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | > |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | > |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | > |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | > +-+--+---+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967977#comment-16967977 ] Nasir Ali edited comment on SPARK-28502 at 11/6/19 12:38 AM: - [~bryanc] Sorry I had to remove my previous comment as I was in the middle of debugging this issue. I found the culprit package. My code works all fine with pyarrow==0.14.1. However, with the latest pyarrow release (0.15.1), my example code throws following exception. Please let me know if you need more information. {code:java} // code placeholder 19/11/05 18:23:17 ERROR Executor: Exception in task 13.0 in stage 5.0 (TID 13) java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:547) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:178) at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:169) at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:89) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:437) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:337) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 19/11/05 18:23:17 ERROR TaskSetManager: Task 13 in stage 5.0 failed 1 times; aborting job {code} was (Author: nasirali): Sorry I had to remove my previous comment as I was in the middle of debugging this issue. I found the culprit package. My code works all fine with pyarrow==0.14.1. However, with the latest pyarrow release (0.15.1), my example code throws following exception. Please let me know if you need more information. {code:java} // code placeholder 19/11/05 18:23:17 ERROR Executor: Exception in task 13.0 in stage 5.0 (TID 13) java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:547) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:178) at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:169) at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:89) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:437) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
[jira] [Commented] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967977#comment-16967977 ] Nasir Ali commented on SPARK-28502: --- Sorry I had to remove my previous comment as I was in the middle of debugging this issue. I found the culprit package. My code works all fine with pyarrow==0.14.1. However, with the latest pyarrow release (0.15.1), my example code throws following exception. Please let me know if you need more information. {code:java} // code placeholder 19/11/05 18:23:17 ERROR Executor: Exception in task 13.0 in stage 5.0 (TID 13) java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:547) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:178) at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:169) at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:89) at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:437) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:337) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 19/11/05 18:23:17 ERROR TaskSetManager: Task 13 in stage 5.0 failed 1 times; aborting job {code} > Error with struct conversion while using pandas_udf > --- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 >Reporter: Nasir Ali >Priority: Minor > Fix For: 3.0.0 > > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], >["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema,
[jira] [Resolved] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali resolved SPARK-28502. --- Resolution: Fixed > Error with struct conversion while using pandas_udf > --- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 >Reporter: Nasir Ali >Priority: Minor > Fix For: 3.0.0 > > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], >["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) > {code} > Output > {code:java} > +-+--+---+ > |id |window|avg(id)| > +-+--+---+ > |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | > |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | > |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | > |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | > |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | > |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | > |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | > +-+--+---+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-28502: -- Comment: was deleted (was: [~bryanc] I have tested the same code on newly released spark version ([v3.0.0-preview-rc2|https://github.com/apache/spark/releases/tag/v3.0.0-preview-rc2]) on git. However, it failed with the same exception mentioned in earlier bug report) > Error with struct conversion while using pandas_udf > --- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 >Reporter: Nasir Ali >Priority: Minor > Fix For: 3.0.0 > > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], >["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) > {code} > Output > {code:java} > +-+--+---+ > |id |window|avg(id)| > +-+--+---+ > |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | > |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | > |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | > |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | > |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | > |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | > |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | > +-+--+---+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Reopened] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali reopened SPARK-28502: --- [~bryanc] I have tested the same code on newly released spark version ([v3.0.0-preview-rc2|https://github.com/apache/spark/releases/tag/v3.0.0-preview-rc2]) on git. However, it failed with the same exception mentioned in earlier bug report > Error with struct conversion while using pandas_udf > --- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 >Reporter: Nasir Ali >Priority: Minor > Fix For: 3.0.0 > > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], >["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) > {code} > Output > {code:java} > +-+--+---+ > |id |window|avg(id)| > +-+--+---+ > |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | > |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | > |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | > |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | > |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | > |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | > |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | > +-+--+---+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16948832#comment-16948832 ] Nasir Ali commented on SPARK-28502: --- [~bryanc] I tested it and it works fine with master branch. Is there any expected release date for version 3? Or could this bug be fix be integrated in next release? > Error with struct conversion while using pandas_udf > --- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 >Reporter: Nasir Ali >Priority: Minor > Fix For: 3.0.0 > > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], >["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) > {code} > Output > {code:java} > +-+--+---+ > |id |window|avg(id)| > +-+--+---+ > |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | > |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | > |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | > |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | > |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | > |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | > |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | > +-+--+---+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16935065#comment-16935065 ] Nasir Ali commented on SPARK-28502: --- any update? > Error with struct conversion while using pandas_udf > --- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 >Reporter: Nasir Ali >Priority: Minor > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], >["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) > {code} > Output > {code:java} > +-+--+---+ > |id |window|avg(id)| > +-+--+---+ > |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | > |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | > |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | > |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | > |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | > |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | > |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | > +-+--+---+ > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16894238#comment-16894238 ] Nasir Ali commented on SPARK-28502: --- I tried to set timzone to UTC as suggested by [~icexelloss] but it didn't solve the problem. It is throwing following error (same error but with tz=UTC). {code:java} TypeError: Unsupported type in conversion from Arrow: struct {code} I think, this is a conversion error, maybe? Below is the complete trace: {code:java} df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() File "/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 378, in show File "/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o207.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 5.0 failed 1 times, most recent failure: Lost task 13.0 in stage 5.0 (TID 32, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main process() File "/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 283, in dump_stream for series in iterator: File "/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 301, in load_stream yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()] File "/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 301, in yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()] File "/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in arrow_to_pandas s = _check_series_convert_date(s, from_arrow_type(arrow_column.type)) File "/home/ali/spark/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 1672, in from_arrow_type raise TypeError("Unsupported type in conversion from Arrow: " + str(at)) TypeError: Unsupported type in conversion from Arrow: struct {code} > Error with struct conversion while using pandas_udf > --- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 >Reporter: Nasir Ali >Priority: Minor > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], >["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15
[jira] [Updated] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-28502: -- Issue Type: Bug (was: Question) > Error with struct conversion while using pandas_udf > --- > > Key: SPARK-28502 > URL: https://issues.apache.org/jira/browse/SPARK-28502 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 > Environment: OS: Ubuntu > Python: 3.6 >Reporter: Nasir Ali >Priority: Minor > > What I am trying to do: Group data based on time intervals (e.g., 15 days > window) and perform some operations on dataframe using (pandas) UDFs. I don't > know if there is a better/cleaner way to do it. > Below is the sample code that I tried and error message I am getting. > > {code:java} > df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), > (13.00, "2018-03-11T12:27:18+00:00"), > (25.00, "2018-03-12T11:27:18+00:00"), > (20.00, "2018-03-13T15:27:18+00:00"), > (17.00, "2018-03-14T12:27:18+00:00"), > (99.00, "2018-03-15T11:27:18+00:00"), > (156.00, "2018-03-22T11:27:18+00:00"), > (17.00, "2018-03-31T11:27:18+00:00"), > (25.00, "2018-03-15T11:27:18+00:00"), > (25.00, "2018-03-16T11:27:18+00:00") > ], >["id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > schema = StructType([ > StructField("id", IntegerType()), > StructField("ts", TimestampType()) > ]) > @pandas_udf(schema, PandasUDFType.GROUPED_MAP) > def some_udf(df): > # some computation > return df > df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() > {code} > This throws following exception: > {code:java} > TypeError: Unsupported type in conversion from Arrow: struct timestamp[us, tz=America/Chicago], end: timestamp[us, tz=America/Chicago]> > {code} > > However, if I use builtin agg method then it works all fine. For example, > {code:java} > df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) > {code} > Output > {code:java} > +-+--+---+ > |id |window|avg(id)| > +-+--+---+ > |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | > |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | > |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | > |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | > |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | > |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | > |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | > +-+--+---+ > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-28502: -- Description: What I am trying to do: Group data based on time intervals (e.g., 15 days window) and perform some operations on dataframe using (pandas) UDFs. I don't know if there is a better/cleaner way to do it. Below is the sample code that I tried and error message I am getting. {code:java} df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), (13.00, "2018-03-11T12:27:18+00:00"), (25.00, "2018-03-12T11:27:18+00:00"), (20.00, "2018-03-13T15:27:18+00:00"), (17.00, "2018-03-14T12:27:18+00:00"), (99.00, "2018-03-15T11:27:18+00:00"), (156.00, "2018-03-22T11:27:18+00:00"), (17.00, "2018-03-31T11:27:18+00:00"), (25.00, "2018-03-15T11:27:18+00:00"), (25.00, "2018-03-16T11:27:18+00:00") ], ["id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) schema = StructType([ StructField("id", IntegerType()), StructField("ts", TimestampType()) ]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def some_udf(df): # some computation return df df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() {code} This throws following exception: {code:java} TypeError: Unsupported type in conversion from Arrow: struct {code} However, if I use builtin agg method then it works all fine. For example, {code:java} df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) {code} Output {code:java} +-+--+---+ |id |window|avg(id)| +-+--+---+ |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | +-+--+---+ {code} was: What I am trying to do: Group data based on time intervals (e.g., 15 days window) and perform some operations on dataframe using (pandas) UDFs. I don't know if there is a better/cleaner solution to do it. Below is the sample code that I tried and error message I am getting. {code:java} df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), (13.00, "2018-03-11T12:27:18+00:00"), (25.00, "2018-03-12T11:27:18+00:00"), (20.00, "2018-03-13T15:27:18+00:00"), (17.00, "2018-03-14T12:27:18+00:00"), (99.00, "2018-03-15T11:27:18+00:00"), (156.00, "2018-03-22T11:27:18+00:00"), (17.00, "2018-03-31T11:27:18+00:00"), (25.00, "2018-03-15T11:27:18+00:00"), (25.00, "2018-03-16T11:27:18+00:00") ], ["id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) schema = StructType([ StructField("id", IntegerType()), StructField("ts", TimestampType()) ]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def some_udf(df): # some computation return df df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() {code} This throws following exception: {code:java} TypeError: Unsupported type in conversion from Arrow: struct {code} However, if I use builtin agg method then it works all fine. For example, {code:java} df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) {code} Output {code:java} +-+--+---+ |id |window|avg(id)| +-+--+---+ |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | +-+--+---+ {code} > Error with struct conversion while using pandas_udf > --- > > Key:
[jira] [Updated] (SPARK-28502) Error with struct conversion while using pandas_udf
[ https://issues.apache.org/jira/browse/SPARK-28502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nasir Ali updated SPARK-28502: -- Description: What I am trying to do: Group data based on time intervals (e.g., 15 days window) and perform some operations on dataframe using (pandas) UDFs. I don't know if there is a better/cleaner solution to do it. Below is the sample code that I tried and error message I am getting. {code:java} df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), (13.00, "2018-03-11T12:27:18+00:00"), (25.00, "2018-03-12T11:27:18+00:00"), (20.00, "2018-03-13T15:27:18+00:00"), (17.00, "2018-03-14T12:27:18+00:00"), (99.00, "2018-03-15T11:27:18+00:00"), (156.00, "2018-03-22T11:27:18+00:00"), (17.00, "2018-03-31T11:27:18+00:00"), (25.00, "2018-03-15T11:27:18+00:00"), (25.00, "2018-03-16T11:27:18+00:00") ], ["id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) schema = StructType([ StructField("id", IntegerType()), StructField("ts", TimestampType()) ]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def some_udf(df): # some computation return df df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() {code} This throws following exception: {code:java} TypeError: Unsupported type in conversion from Arrow: struct {code} However, if I use builtin agg method then it works all fine. For example, {code:java} df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) {code} Output {code:java} +-+--+---+ |id |window|avg(id)| +-+--+---+ |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | +-+--+---+ {code} was: What I am trying to do: Group data based on time intervals (e.g., 15 days window) and perform some operations on dataframe using (pandas) UDFs. I am new to pyspark. I don't know if there is a better/cleaner solution to do it. Below is the sample code that I tried and error message I am getting. {code:java} df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), (13.00, "2018-03-11T12:27:18+00:00"), (25.00, "2018-03-12T11:27:18+00:00"), (20.00, "2018-03-13T15:27:18+00:00"), (17.00, "2018-03-14T12:27:18+00:00"), (99.00, "2018-03-15T11:27:18+00:00"), (156.00, "2018-03-22T11:27:18+00:00"), (17.00, "2018-03-31T11:27:18+00:00"), (25.00, "2018-03-15T11:27:18+00:00"), (25.00, "2018-03-16T11:27:18+00:00") ], ["id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) schema = StructType([ StructField("id", IntegerType()), StructField("ts", TimestampType()) ]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def some_udf(df): # some computation return df df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() {code} This throws following exception: {code:java} TypeError: Unsupported type in conversion from Arrow: struct {code} However, if I use builtin agg method then it works all fine. For example, {code:java} df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) {code} Output {code:java} +-+--+---+ |id |window|avg(id)| +-+--+---+ |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | +-+--+---+ {code} > Error with struct conversion while using pandas_udf > --- > >
[jira] [Created] (SPARK-28502) Error with struct conversion while using pandas_udf
Nasir Ali created SPARK-28502: - Summary: Error with struct conversion while using pandas_udf Key: SPARK-28502 URL: https://issues.apache.org/jira/browse/SPARK-28502 Project: Spark Issue Type: Question Components: PySpark Affects Versions: 2.4.3 Environment: OS: Ubuntu Python: 3.6 Reporter: Nasir Ali What I am trying to do: Group data based on time intervals (e.g., 15 days window) and perform some operations on dataframe using (pandas) UDFs. I am new to pyspark. I don't know if there is a better/cleaner solution to do it. Below is the sample code that I tried and error message I am getting. {code:java} df = sparkSession.createDataFrame([(17.00, "2018-03-10T15:27:18+00:00"), (13.00, "2018-03-11T12:27:18+00:00"), (25.00, "2018-03-12T11:27:18+00:00"), (20.00, "2018-03-13T15:27:18+00:00"), (17.00, "2018-03-14T12:27:18+00:00"), (99.00, "2018-03-15T11:27:18+00:00"), (156.00, "2018-03-22T11:27:18+00:00"), (17.00, "2018-03-31T11:27:18+00:00"), (25.00, "2018-03-15T11:27:18+00:00"), (25.00, "2018-03-16T11:27:18+00:00") ], ["id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) schema = StructType([ StructField("id", IntegerType()), StructField("ts", TimestampType()) ]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def some_udf(df): # some computation return df df.groupby('id', F.window("ts", "15 days")).apply(some_udf).show() {code} This throws following exception: {code:java} TypeError: Unsupported type in conversion from Arrow: struct {code} However, if I use builtin agg method then it works all fine. For example, {code:java} df.groupby('id', F.window("ts", "15 days")).mean().show(truncate=False) {code} Output {code:java} +-+--+---+ |id |window|avg(id)| +-+--+---+ |13.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|13.0 | |17.0 |[2018-03-20 00:00:00, 2018-04-04 00:00:00]|17.0 | |156.0|[2018-03-20 00:00:00, 2018-04-04 00:00:00]|156.0 | |99.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|99.0 | |20.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|20.0 | |17.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|17.0 | |25.0 |[2018-03-05 00:00:00, 2018-03-20 00:00:00]|25.0 | +-+--+---+ {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org