[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema

2024-06-03 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-48492:
---
Issue Type: Bug  (was: New Feature)

> batch-read parquet files written by streaming returns non-nullable fields in 
> schema
> ---
>
> Key: SPARK-48492
> URL: https://issues.apache.org/jira/browse/SPARK-48492
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.4.1
> Environment: python --version
> Python 3.9.13
>  
> spark-submit --version
> Welcome to
>                     __
>      / __/__  ___ _/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /___/ .__/\_,_/_/ /_/\_\   version 3.4.1
>       /_/
>                         
> Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 1.8.0_302
> Branch HEAD
> Compiled by user centos on 2023-06-19T23:01:01Z
> Revision 6b1ff22dde1ead51cbf370be6e48a802daae58b6
>Reporter: Julien Peloton
>Priority: Major
>
> Hello,
> In the documentation, it is stated that
> > When reading Parquet files, all columns are automatically converted to be 
> > nullable for compatibility reasons.
> While this seems correct for static DataFrames, I have a counter example for 
> streaming ones:
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql import Row
> import pyspark.sql.functions as F
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("WARN")
> df = spark.createDataFrame(
>     [
>         Row(a=1, b=2.0, c="toto"),
>         Row(a=3, b=4.0, c="titi"),
>         Row(a=10, b=11.0, c="tutu"),
>     ]
> )
> # add a non-nullable column
> df = df.withColumn('d', F.lit(1.0))
> print("Original dataframe")
> df.printSchema()
> # Write this on disk
> df.write.parquet('static.parquet')
> # Now load a stream
> df_stream = (
>     spark.readStream.format("parquet")
>     .schema(df.schema)
>     .option("path", "static.parquet")
>     .option("latestFirst", False)
>     .load()
> )
> # add a non-nullable column
> df_stream = df_stream.withColumn('e', F.lit("error"))
> print("Streaming dataframe")
> df_stream.printSchema()
> # Now write the dataframe using writestream
> query = (
>     df_stream.writeStream.outputMode("append")
>     .format("parquet")
>     .option("checkpointLocation", 'test_parquet_checkpoint')
>     .option("path", 'test_parquet')
>     .trigger(availableNow=True)
>     .start()
> )
> spark.streams.awaitAnyTermination()
> # Now read back
> df_stream_2 = spark.read.format("parquet").load("test_parquet")
> print("Static dataframe from the streaming job (read)")
> df_stream_2.printSchema() 
> # Now load a stream
> df_stream_3 = (
>     spark.readStream.format("parquet")
>     .schema(df_stream_2.schema)
>     .option("path", "test_parquet")
>     .option("latestFirst", False)
>     .load()
> )
> print("Streaming dataframe from the streaming job (readStream)")
> df_stream_3.printSchema(){code}
>  
>  
> which outputs:
> {noformat}
> Original dataframe
> root
>  |-- a: long (nullable = true)
>  |-- b: double (nullable = true)
>  |-- c: string (nullable = true)
>  |-- d: double (nullable = false)
> Streaming dataframe
> root
>  |-- a: long (nullable = true)
>  |-- b: double (nullable = true)
>  |-- c: string (nullable = true)
>  |-- d: double (nullable = true)
>  |-- e: string (nullable = false)
> Static dataframe from the streaming job (read)
> root
>  |-- a: long (nullable = true)
>  |-- b: double (nullable = true)
>  |-- c: string (nullable = true)
>  |-- d: double (nullable = true)
>  |-- e: string (nullable = false)
> Streaming dataframe from the streaming job (readStream)
> root
>  |-- a: long (nullable = true)
>  |-- b: double (nullable = true)
>  |-- c: string (nullable = true)
>  |-- d: double (nullable = true)
>  |-- e: string (nullable = true){noformat}
>  
> So the column `d` is correctly set to `nullable = true` (expected), but in 
> the case of the column `e`, it stays non-nullable if it is read using the 
> `read` method and it is correctly set to `nullable = true` if read with 
> `readStream`. Is that expected? According to this old issue, 
> https://issues.apache.org/jira/browse/SPARK-28651, it was supposed to be 
> resolved. Any ideas?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema

2024-05-31 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-48492:
---
Description: 
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for static DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job (read)")
df_stream_2.printSchema() 

# Now load a stream
df_stream_3 = (
    spark.readStream.format("parquet")
    .schema(df_stream_2.schema)
    .option("path", "test_parquet")
    .option("latestFirst", False)
    .load()
)

print("Streaming dataframe from the streaming job (readStream)")
df_stream_3.printSchema(){code}
 

 

which outputs:
{noformat}
Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job (read)
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Streaming dataframe from the streaming job (readStream)
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = true){noformat}
 

So the column `d` is correctly set to `nullable = true` (expected), but in the 
case of the column `e`, it stays non-nullable if it is read using the `read` 
method and it is correctly set to `nullable = true` if read with `readStream`. 
Is that expected? According to this old issue, 
https://issues.apache.org/jira/browse/SPARK-28651, it was supposed to be 
resolved. Any ideas?

  was:
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for static DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job")
df_stream_2.printSchema() 

# Now load a stream
df_stream_3 = (
    spark.readStream.format("parquet")
    .schema(df_stream_2.schema)
    .option("path", "test_parquet")
 

[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema

2024-05-31 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-48492:
---
Description: 
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for static DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job")
df_stream_2.printSchema() 

# Now load a stream
df_stream_3 = (
    spark.readStream.format("parquet")
    .schema(df_stream_2.schema)
    .option("path", "test_parquet")
    .option("latestFirst", False)
    .load()
)

print("Streaming dataframe from the streaming job")
df_stream_3.printSchema(){code}
 

 

which outputs:
{noformat}
Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Streaming dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = true){noformat}
 

So the column `d` is correctly set to `nullable = true` (expected), but in the 
case of the column `e`, it stays non-nullable if it is read using the `read` 
method and it is correctly set to `nullable = true` if read with `readStream`. 
Is that expected? According to this old issue, 
https://issues.apache.org/jira/browse/SPARK-28651, it was supposed to be 
resolved. Any ideas?

  was:
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for static DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job")
df_stream_2.printSchema() 

# Now load a stream
df_stream_3 = (
    spark.readStream.format("parquet")
    .schema(df_stream_2.schema)
    .option("path", "test_parquet")
    .option("latestFirst", False)
    

[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema

2024-05-31 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-48492:
---
Description: 
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for static DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job")
df_stream_2.printSchema() 

# Now load a stream
df_stream_3 = (
    spark.readStream.format("parquet")
    .schema(df_stream_2.schema)
    .option("path", "test_parquet")
    .option("latestFirst", False)
    .load()
)

print("Streaming dataframe from the streaming job")
df_stream_3.printSchema(){code}
 

 

which outputs:
{noformat}
Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Streaming dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = true){noformat}
 

So the column `d` is correctly set to `nullable = true` (expected), but in the 
case of the column `e`, it stays non-nullable if it is read using the `read` 
method and it is correctly set to `nullable = true` if read with `readStream`. 
Is that expected? According to this old PR 
[https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)]
 it was supposed to be resolved. Any ideas?

  was:
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for static DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job")
df_stream_2.printSchema() 

# Now load a stream
df_stream_3 = (
    spark.readStream.format("parquet")
    .schema(df_stream_2.schema)
    .option("path", "test_parquet")
    

[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema

2024-05-31 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-48492:
---
Description: 
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for static DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job")
df_stream_2.printSchema() 

# Now load a stream
df_stream_3 = (
    spark.readStream.format("parquet")
    .schema(df_stream_2.schema)
    .option("path", "test_parquet")
    .option("latestFirst", False)
    .load()
)

print("Streaming dataframe from the streaming job")
df_stream_3.printSchema(){code}
 

 

which outputs:
{noformat}
Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Streaming dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = true){noformat}
 

So the column `d` is correctly set to `nullable = true` (expected), but in the 
case of the column `e`, it stays non-nullable if it is read using `read` method 
and it is correctly set to `nullable = true` is read with `readStream`. Is that 
expected? According to this old PR 
[https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)]
 it was supposed to be resolved. Any ideas?

  was:
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for static DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job")
df_stream_2.printSchema() {code}
 

 

which outputs:
{noformat}
Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = 

[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema

2024-05-31 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-48492:
---
Description: 
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for static DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job")
df_stream_2.printSchema() {code}
 

 

which outputs:
{noformat}
Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false){noformat}
 

So the column `d` is correctly set to `nullable = true`, but not the column 
`e`, which stays non-nullable. Is that expected? According to this old PR 
[https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)]
 it was supposed to be resolved. Any ideas?

  was:
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for batch DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job")
df_stream_2.printSchema() {code}
 

 

which outputs:
{noformat}
Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false){noformat}
 

So the column `d` is correctly set to `nullable = true`, but not the column 
`e`, which stays non-nullable. Is that expected? According to this old PR 

[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema

2024-05-31 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-48492:
---
Description: 
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for batch DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")

print("Static dataframe from the streaming job")
df_stream_2.printSchema() {code}
 

 

which outputs:
{noformat}
Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false){noformat}
 

So the column `d` is correctly set to `nullable = true`, but not the column 
`e`, which stays non-nullable. Is that expected? According to this old PR 
[https://github.com/apache/spark/pull/25382|https://github.com/apache/spark/pull/25382)]
 it was supposed to be resolved. Any ideas?

  was:
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for batch DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()# Write this on disk

df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")
print("Static dataframe from the streaming job")
df_stream_2.printSchema() {code}
 

 

which outputs:
{noformat}
Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false){noformat}
 

So the column `d` is correctly set to `nullable = true`, but not the column 
`e`, which stays non-nullable. Is that expected? According to this [old 

[jira] [Updated] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema

2024-05-31 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-48492:
---
Description: 
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for batch DataFrames, I have a counter example for 
streaming ones:

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

# add a non-nullable column
df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()# Write this on disk

df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

# add a non-nullable column
df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")
print("Static dataframe from the streaming job")
df_stream_2.printSchema() {code}
 

 

which outputs:
{noformat}
Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false){noformat}
 

So the column `d` is correctly set to `nullable = true`, but not the column 
`e`, which stays non-nullable. Is that expected? According to this [old 
PR]([https://github.com/apache/spark/pull/25382)] it was supposed to be 
resolved. Any ideas?

  was:
Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for batch DataFrames, I have a counter example for 
streaming ones:

```python

from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

 

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")
print("Static dataframe from the streaming job")
df_stream_2.printSchema()

```

which outputs:

```

Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

```

So the column `d` is correctly set to `nullable = true`, but not the column 
`e`, which stays non-nullable. Is that expected? According to this [old 
PR]([https://github.com/apache/spark/pull/25382)] it was supposed to be 
resolved. Any ideas?


> batch-read parquet files 

[jira] [Created] (SPARK-48492) batch-read parquet files written by streaming returns non-nullable fields in schema

2024-05-31 Thread Julien Peloton (Jira)
Julien Peloton created SPARK-48492:
--

 Summary: batch-read parquet files written by streaming returns 
non-nullable fields in schema
 Key: SPARK-48492
 URL: https://issues.apache.org/jira/browse/SPARK-48492
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 3.4.1
 Environment: python --version
Python 3.9.13

 

spark-submit --version
Welcome to
                    __
     / __/__  ___ _/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.1
      /_/
                        
Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 1.8.0_302
Branch HEAD
Compiled by user centos on 2023-06-19T23:01:01Z
Revision 6b1ff22dde1ead51cbf370be6e48a802daae58b6
Reporter: Julien Peloton


Hello,

In the documentation, it is stated that

> When reading Parquet files, all columns are automatically converted to be 
> nullable for compatibility reasons.

While this seems correct for batch DataFrames, I have a counter example for 
streaming ones:

```python

from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame(
    [
        Row(a=1, b=2.0, c="toto"),
        Row(a=3, b=4.0, c="titi"),
        Row(a=10, b=11.0, c="tutu"),
    ]
)

df = df.withColumn('d', F.lit(1.0))

print("Original dataframe")
df.printSchema()

# Write this on disk
df.write.parquet('static.parquet')

# Now load a stream
df_stream = (
    spark.readStream.format("parquet")
    .schema(df.schema)
    .option("path", "static.parquet")
    .option("latestFirst", False)
    .load()
)

df_stream = df_stream.withColumn('e', F.lit("error"))

print("Streaming dataframe")
df_stream.printSchema()

 

# Now write the dataframe using writestream
query = (
    df_stream.writeStream.outputMode("append")
    .format("parquet")
    .option("checkpointLocation", 'test_parquet_checkpoint')
    .option("path", 'test_parquet')
    .trigger(availableNow=True)
    .start()
)

spark.streams.awaitAnyTermination()

# Now read back
df_stream_2 = spark.read.format("parquet").load("test_parquet")
print("Static dataframe from the streaming job")
df_stream_2.printSchema()

```

which outputs:

```

Original dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = false)

Streaming dataframe
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

Static dataframe from the streaming job
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: double (nullable = true)
 |-- e: string (nullable = false)

```

So the column `d` is correctly set to `nullable = true`, but not the column 
`e`, which stays non-nullable. Is that expected? According to this [old 
PR]([https://github.com/apache/spark/pull/25382)] it was supposed to be 
resolved. Any ideas?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import

2022-03-07 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-38435:
---
Description: 
h2. Old style pandas UDF

let's consider a pandas UDF defined in the old style:
{code:python}
# mymod.py
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StringType

@pandas_udf(StringType(), PandasUDFType.SCALAR)
def to_upper(s):
    return s.str.upper()
{code}
I can import it and use it as:
{code:python}
# main.py
from pyspark.sql import SparkSession
from mymod import to_upper

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show() {code}
and launch it via:
{code:python}
spark-submit main.py
spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: 
UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to 
specify type hints for pandas UDF instead of specifying pandas 
UDF type which will be deprecated in the future releases. See 
SPARK-28264 for more details.
+--+
|to_upper(name)|
+--+
|      JOHN DOE|
+--+ {code}
as expected, we obtain the `UserWarning`, but the code is working fine.
h2. New style pandas UDF: using type hint

Let's now switch to the version using type hints:
{code:python}
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper() {code}
But this time, I obtain an `AttributeError`:
{code:bash}
spark-submit main.py
Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 835, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the 
above exception, another exception occurred:Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 839, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the 
above exception, another exception occurred:Traceback (most recent call last):
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 
2, in 
    from mymod import to_upper
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 
5, in 
    def to_upper(s: pd.Series) -> pd.Series:
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py",
 line 432, in _create_pandas_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 43, in _create_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 206, in _wrapped
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 96, in returnType
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 841, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 831, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 823, in from_ddl_schema
AttributeError: 'NoneType' object has no attribute '_jvm'
log4j:WARN No appenders could be found for logger 
(org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info. {code}
 

The code crashes at the import level. Looking at the code, the spark context 
needs to exist:

[https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827]

which at the time of the import is not the case. 
h2. Questions

First, am I doing something wrong? I do not see in the documentation 
([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html])
 mention of this, and it seems it should affect many users that are moving from 
old style to new style pandas UDF.

Second, is this the expected behaviour? Looking at the old style pandas UDF, 
where the module can be imported without problem, the new behaviour looks like 
a regress. Why users would have to have the spark context active to just import 
a module that contains pandas UDF? 

Third, what could we do? I see in the master branch that an assert has been 
recently added (https://issues.apache.org/jira/browse/SPARK-37620):


[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import

2022-03-07 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-38435:
---
Description: 
h2. Old style pandas UDF

let's consider a pandas UDF defined in the old style:
{code:python}
# mymod.py
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StringType

@pandas_udf(StringType(), PandasUDFType.SCALAR)
def to_upper(s):
    return s.str.upper()
{code}
I can import it and use it as:
{code:python}
# main.py
from pyspark.sql import SparkSession
from mymod import to_upper

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show() {code}
and launch it via:
{code:python}
spark-submit main.py
spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: 
UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to 
specify type hints for pandas UDF instead of specifying pandas 
UDF type which will be deprecated in the future releases. See 
SPARK-28264 for more details.
+--+
|to_upper(name)|
+--+
|      JOHN DOE|
+--+ {code}
Except the `UserWarning`, the code is working as expected.
h2. New style pandas UDF: using type hint

Let's now switch to the version using type hints:
{code:python}
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper() {code}
But this time, I obtain an `AttributeError`:
{code:bash}
spark-submit main.py
Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 835, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the 
above exception, another exception occurred:Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 839, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the 
above exception, another exception occurred:Traceback (most recent call last):
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 
2, in 
    from mymod import to_upper
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 
5, in 
    def to_upper(s: pd.Series) -> pd.Series:
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py",
 line 432, in _create_pandas_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 43, in _create_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 206, in _wrapped
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 96, in returnType
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 841, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 831, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 823, in from_ddl_schema
AttributeError: 'NoneType' object has no attribute '_jvm'
log4j:WARN No appenders could be found for logger 
(org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info. {code}
 

The code crashes at the import level. Looking at the code, the spark context 
needs to exist:

[https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827]

which at the time of the import is not the case. 
h2. Questions

First, am I doing something wrong? I do not see in the documentation 
([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html])
 mention of this, and it seems it should affect many users that are moving from 
old style to new style pandas UDF.

Second, is this the expected behaviour? Looking at the old style pandas UDF, 
where the module can be imported without problem, the new behaviour looks like 
a regress. Why users would have to have the spark context active to just import 
a module that contains pandas UDF? 

Third, what could we do? I see in the master branch that an assert has been 
recently added (https://issues.apache.org/jira/browse/SPARK-37620):


[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import

2022-03-07 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-38435:
---
Description: 
h2. Old style pandas UDF

let's consider a pandas UDF defined in the old style:
{code:python}
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StringType

@pandas_udf(StringType(), PandasUDFType.SCALAR)
def to_upper(s):
    return s.str.upper()
{code}
I can import it and use it as:
{code:python}
# main.py
from pyspark.sql import SparkSession
from mymod import to_upper

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show() {code}
and launch it via:
{code:python}
spark-submit main.py
spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: 
UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to 
specify type hints for pandas UDF instead of specifying pandas 
UDF type which will be deprecated in the future releases. See 
SPARK-28264 for more details.
+--+
|to_upper(name)|
+--+
|      JOHN DOE|
+--+ {code}
Except the `UserWarning`, the code is working as expected.
h2. New style pandas UDF: using type hint

Let's now switch to the version using type hints:
{code:python}
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper() {code}
But this time, I obtain an `AttributeError`:
{code:bash}
spark-submit main.py
Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 835, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the 
above exception, another exception occurred:Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 839, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the 
above exception, another exception occurred:Traceback (most recent call last):
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 
2, in 
    from mymod import to_upper
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 
5, in 
    def to_upper(s: pd.Series) -> pd.Series:
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py",
 line 432, in _create_pandas_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 43, in _create_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 206, in _wrapped
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 96, in returnType
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 841, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 831, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 823, in from_ddl_schema
AttributeError: 'NoneType' object has no attribute '_jvm'
log4j:WARN No appenders could be found for logger 
(org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info. {code}
 

The code crashes at the import level. Looking at the code, the spark context 
needs to exist:

[https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827]

which at the time of the import is not the case. 
h2. Questions

First, am I doing something wrong? I do not see in the documentation 
([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html])
 mention of this, and it seems it should affect many users that are moving from 
old style to new style pandas UDF.

Second, is this the expected behaviour? Looking at the old style pandas UDF, 
where the module can be imported without problem, the new behaviour looks like 
a regress. Why users would have to have the spark context active to just import 
a module that contains pandas UDF? 

Third, what could we do? I see in the master branch that an assert has been 
recently added (https://issues.apache.org/jira/browse/SPARK-37620):


[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import

2022-03-07 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-38435:
---
Description: 
h2. Old style pandas UDF

let's consider a pandas UDF defined in the old style:

{code:python}
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StringType

@pandas_udf(StringType(), PandasUDFType.SCALAR)
def to_upper(s):
    return s.str.upper()
{code}
I can import it and use it as:
{code:python}
# main.py
from pyspark.sql import SparkSession
from mymod import to_upperspark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show() {code}
and launch it via:
{code:python}
spark-submit main.py
spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: 
UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to 
specify type hints for pandas UDF instead of specifying pandas 
UDF type which will be deprecated in the future releases. See 
SPARK-28264 for more details.
+--+
|to_upper(name)|
+--+
|      JOHN DOE|
+--+ {code}
Except the `UserWarning`, the code is working as expected.
h2. New style pandas UDF: using type hint

Let's now switch to the version using type hints:
{code:python}
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper() {code}
But this time, I obtain an `AttributeError`:
{code:bash}
spark-submit main.py
Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 835, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the 
above exception, another exception occurred:Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 839, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the 
above exception, another exception occurred:Traceback (most recent call last):
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 
2, in 
    from mymod import to_upper
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 
5, in 
    def to_upper(s: pd.Series) -> pd.Series:
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py",
 line 432, in _create_pandas_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 43, in _create_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 206, in _wrapped
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 96, in returnType
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 841, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 831, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 823, in from_ddl_schema
AttributeError: 'NoneType' object has no attribute '_jvm'
log4j:WARN No appenders could be found for logger 
(org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info. {code}
 

The code crashes at the import level. Looking at the code, the spark context 
needs to exist:

[https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827]

which at the time of the import is not the case. 
h2. Questions

First, am I doing something wrong? I do not see in the documentation 
([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html])
 mention of this, and it seems it should affect many users that are moving from 
old style to new style pandas UDF.

Second, is this the expected behaviour? Looking at the old style pandas UDF, 
where the module can be imported without problem, the new behaviour looks like 
a regress. Why users would have to have the spark context active to just import 
a module that contains pandas UDF? 

Third, what could we do? I see in the master branch that an assert has been 
recently added (https://issues.apache.org/jira/browse/SPARK-37620):


[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import

2022-03-07 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-38435:
---
Description: 
h2. Old style pandas UDF

let's consider a pandas UDF defined in the old style:
{code:java}
{code:python}
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StringType

@pandas_udf(StringType(), PandasUDFType.SCALAR)
def to_upper(s):
    return s.str.upper()
{code}{code}
I can import it and use it as:
{code:java}
# main.py
from pyspark.sql import SparkSession
from mymod import to_upperspark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show() {code}
and launch it via:
{code:java}
spark-submit main.py
spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: 
UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to 
specify type hints for pandas UDF instead of specifying pandas 
UDF type which will be deprecated in the future releases. See 
SPARK-28264 for more details.
+--+
|to_upper(name)|
+--+
|      JOHN DOE|
+--+ {code}
Except the `UserWarning`, the code is working as expected.
h2. New style pandas UDF: using type hint

Let's now switch to the version using type hints:
{code:java}
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper() {code}
But this time, I obtain an `AttributeError`:
{code:java}
spark-submit main.py
Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 835, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the 
above exception, another exception occurred:Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 839, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the 
above exception, another exception occurred:Traceback (most recent call last):
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 
2, in 
    from mymod import to_upper
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 
5, in 
    def to_upper(s: pd.Series) -> pd.Series:
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py",
 line 432, in _create_pandas_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 43, in _create_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 206, in _wrapped
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 96, in returnType
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 841, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 831, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 823, in from_ddl_schema
AttributeError: 'NoneType' object has no attribute '_jvm'
log4j:WARN No appenders could be found for logger 
(org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info. {code}
 

The code crashes at the import level. Looking at the code, the spark context 
needs to exist:

[https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827]

which at the time of the import is not the case. 
h2. Questions

First, am I doing something wrong? I do not see in the documentation 
([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html])
 mention of this, and it seems it should affect many users that are moving from 
old style to new style pandas UDF.

Second, is this the expected behaviour? Looking at the old style pandas UDF, 
where the module can be imported without problem, the new behaviour looks like 
a regress. Why users would have to have the spark context active to just import 
a module that contains pandas UDF? 

Third, what could we do? I see in the master branch that an assert has been 
recently added 

[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import

2022-03-07 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-38435:
---
Description: 
h2. Old style pandas UDF

let's consider a pandas UDF defined in the old style:
{code:java}
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StringType

@pandas_udf(StringType(), PandasUDFType.SCALAR)
def to_upper(s):
    return s.str.upper(){code}
I can import it and use it as:
{code:java}
# main.py
from pyspark.sql import SparkSession
from mymod import to_upperspark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show() {code}
and launch it via:
{code:java}
spark-submit main.py
spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: 
UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to 
specify type hints for pandas UDF instead of specifying pandas 
UDF type which will be deprecated in the future releases. See 
SPARK-28264 for more details.
+--+
|to_upper(name)|
+--+
|      JOHN DOE|
+--+ {code}
Except the `UserWarning`, the code is working as expected.
h2. New style pandas UDF: using type hint

Let's now switch to the version using type hints:
{code:java}
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper() {code}
But this time, I obtain an `AttributeError`:
{code:java}
spark-submit main.py
Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 835, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the 
above exception, another exception occurred:Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 839, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'During handling of the 
above exception, another exception occurred:Traceback (most recent call last):
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 
2, in 
    from mymod import to_upper
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 
5, in 
    def to_upper(s: pd.Series) -> pd.Series:
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py",
 line 432, in _create_pandas_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 43, in _create_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 206, in _wrapped
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 96, in returnType
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 841, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 831, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 823, in from_ddl_schema
AttributeError: 'NoneType' object has no attribute '_jvm'
log4j:WARN No appenders could be found for logger 
(org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info. {code}
 

The code crashes at the import level. Looking at the code, the spark context 
needs to exist:

[https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827]

which at the time of the import is not the case. 
h2. Questions

First, am I doing something wrong? I do not see in the documentation 
([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html])
 mention of this, and it seems it should affect many users that are moving from 
old style to new style pandas UDF.

Second, is this the expected behaviour? Looking at the old style pandas UDF, 
where the module can be imported without problem, the new behaviour looks like 
a regress. Why users would have to have the spark context active to just import 
a module that contains pandas UDF? 

Third, what could we do? I see in the master branch that an assert has been 
recently added (https://issues.apache.org/jira/browse/SPARK-37620):


[jira] [Updated] (SPARK-38435) Pandas UDF with type hints crashes at import

2022-03-07 Thread Julien Peloton (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-38435:
---
Description: 
h2. Old style pandas UDF

let's consider a pandas UDF defined in the old style:
{code:java}
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StringType

@pandas_udf(StringType(), PandasUDFType.SCALAR)
def to_upper(s):
    return s.str.upper(){code}
I can import it and use it as:
{code:java}
# main.py
from pyspark.sql import SparkSession
from mymod import to_upperspark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show() {code}
and launch it via:
{code:java}
spark-submit main.py
spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: 
UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to 
specify type hints for pandas UDF instead of specifying pandas 
UDF type which will be deprecated in the future releases. See 
SPARK-28264 for more details.
+--+
|to_upper(name)|
+--+
|      JOHN DOE|
+--+ {code}
Except the `UserWarning`, the code is working as expected.
h2. New style pandas UDF: using type hint

Let's now switch to the version using type hints:

```python
 # mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper()
```

But this time, I obtain an `AttributeError`:

```
spark-submit main.py
Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 835, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 839, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 
2, in 
    from mymod import to_upper
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 
5, in 
    def to_upper(s: pd.Series) -> pd.Series:
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py",
 line 432, in _create_pandas_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 43, in _create_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 206, in _wrapped
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 96, in returnType
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 841, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 831, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 823, in from_ddl_schema
AttributeError: 'NoneType' object has no attribute '_jvm'
log4j:WARN No appenders could be found for logger 
(org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See [http://logging.apache.org/log4j/1.2/faq.html#noconfig] for more 
info.
```

The code crashes at the import level. Looking at the code, the spark context 
needs to exist:

[https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827]

which at the time of the import is not the case. 
 # 
 ## Questions

First, am I doing something wrong? I do not see in the documentation 
([https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html])
 mention of this, and it seems it should affect many users that are moving from 
old style to new style pandas UDF.

Second, is this the expected behaviour? Looking at the old style pandas UDF, 
where the module can be imported without problem, the new behaviour looks like 
a regress. Why users would have to have the spark context active to just import 
a module that contains pandas UDF? 

Third, what could we do? I see in the master branch that an assert has been 
recently added (https://issues.apache.org/jira/browse/SPARK-37620):


[jira] [Created] (SPARK-38435) Pandas UDF with type hints crashes at import

2022-03-07 Thread Julien Peloton (Jira)
Julien Peloton created SPARK-38435:
--

 Summary: Pandas UDF with type hints crashes at import
 Key: SPARK-38435
 URL: https://issues.apache.org/jira/browse/SPARK-38435
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 3.1.0
 Environment: Spark: 3.1
Python: 3.7
Reporter: Julien Peloton


## Old style pandas UDF

let's consider a pandas UDF defined in the old style:

 

 
{code:java}
// code placeholder
{code}
 

 

```python
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StringType

@pandas_udf(StringType(), PandasUDFType.SCALAR)
def to_upper(s):
    return s.str.upper()
```

I can import it and use it as:

```python
# main.py
from pyspark.sql import SparkSession
from mymod import to_upper

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show()
```

and launch it via:

```bash
spark-submit main.py
spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py:392: 
UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to 
specify type hints for pandas UDF instead of specifying pandas 
UDF type which will be deprecated in the future releases. See 
SPARK-28264 for more details.
+--+
|to_upper(name)|
+--+
|      JOHN DOE|
+--+
```

Except the `UserWarning`, the code is working as expected.

## New style pandas UDF: using type hint

Let's now switch to the version using type hints:

```python
# mymod.py
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper()
```

But this time, I obtain an `AttributeError`:

```
spark-submit main.py
Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 835, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 839, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 827, in from_ddl_datatype
AttributeError: 'NoneType' object has no attribute '_jvm'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/main.py", line 
2, in 
    from mymod import to_upper
  File "/Users/julien/Documents/workspace/myrepos/fink-filters/mymod.py", line 
5, in 
    def to_upper(s: pd.Series) -> pd.Series:
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/pandas/functions.py",
 line 432, in _create_pandas_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 43, in _create_udf
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 206, in _wrapped
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/udf.py",
 line 96, in returnType
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 841, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 831, in _parse_datatype_string
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/types.py",
 line 823, in from_ddl_schema
AttributeError: 'NoneType' object has no attribute '_jvm'
log4j:WARN No appenders could be found for logger 
(org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
```

The code crashes at the import level. Looking at the code, the spark context 
needs to exist:

https://github.com/apache/spark/blob/8d70d5da3d74ebdd612b2cdc38201e121b88b24f/python/pyspark/sql/types.py#L819-L827

which at the time of the import is not the case. 

## Questions

First, am I doing something wrong? I do not see in the documentation 
(https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html)
 mention of this, and it seems it should affect many users that are moving from 
old style to new style pandas UDF.

Second, is this the expected behaviour? Looking at the old style pandas UDF, 
where the module can be imported without problem, the new behaviour looks like 
a regress. Why users 

[jira] [Commented] (SPARK-29367) pandas udf not working with latest pyarrow release (0.15.0)

2019-10-07 Thread Julien Peloton (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16946266#comment-16946266
 ] 

Julien Peloton commented on SPARK-29367:


Thanks, it works like a charm!

Julien

> pandas udf not working with latest pyarrow release (0.15.0)
> ---
>
> Key: SPARK-29367
> URL: https://issues.apache.org/jira/browse/SPARK-29367
> Project: Spark
>  Issue Type: Documentation
>  Components: PySpark
>Affects Versions: 2.4.0, 2.4.1, 2.4.3
>Reporter: Julien Peloton
>Assignee: Bryan Cutler
>Priority: Major
>
> Hi,
> I recently upgraded pyarrow from 0.14 to 0.15 (released on Oct 5th), and my 
> pyspark jobs using pandas udf are failing with 
> java.lang.IllegalArgumentException (tested with Spark 2.4.0, 2.4.1, and 
> 2.4.3). Here is a full example to reproduce the failure with pyarrow 0.15:
> {code:python}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> from pyspark.sql.types import BooleanType
> import pandas as pd
> @pandas_udf(BooleanType(), PandasUDFType.SCALAR)
> def qualitycuts(nbad: int, rb: float, magdiff: float) -> pd.Series:
> """ Apply simple quality cuts
> Returns
> --
> out: pandas.Series of booleans
> Return a Pandas DataFrame with the appropriate flag: false for bad alert,
> and true for good alert.
> """
> mask = nbad.values == 0
> mask *= rb.values >= 0.55
> mask *= abs(magdiff.values) <= 0.1
> return pd.Series(mask)
> spark = SparkSession.builder.getOrCreate()
> # Create dummy DF
> colnames = ["nbad", "rb", "magdiff"]
> df = spark.sparkContext.parallelize(
> zip(
> [0, 1, 0, 0],
> [0.01, 0.02, 0.6, 0.01],
> [0.02, 0.05, 0.1, 0.01]
> )
> ).toDF(colnames)
> df.show()
> # Apply cuts
> df = df\
> .withColumn("toKeep", qualitycuts(*colnames))\
> .filter("toKeep == true")\
> .drop("toKeep")
> # This will fail if latest pyarrow 0.15.0 is used
> df.show()
> {code}
> and the log is:
> {code}
> Driver stacktrace:
> 19/10/07 09:37:49 INFO DAGScheduler: Job 3 failed: showString at 
> NativeMethodAccessorImpl.java:0, took 0.660523 s
> Traceback (most recent call last):
>   File 
> "/Users/julien/Documents/workspace/myrepos/fink-broker/test_pyarrow.py", line 
> 44, in 
> df.show()
>   File 
> "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
>  line 378, in show
>   File 
> "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>  line 1257, in __call__
>   File 
> "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>  line 63, in deco
>   File 
> "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
>  line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o64.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 
> (TID 5, localhost, executor driver): java.lang.IllegalArgumentException
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
>   at 
> org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
>   at 
> org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
>   at 
> org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
>   at 
> org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
>   at 
> org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
>   at 
> org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
>   at 
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
>   at 
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
>   at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>   at 
> org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.(ArrowEvalPythonExec.scala:98)
>   at 
> org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96)
>   at 
> org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127)
>   at 
> org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
>   at 
> 

[jira] [Created] (SPARK-29367) pandas udf not working with latest pyarrow release (0.15.0)

2019-10-07 Thread Julien Peloton (Jira)
Julien Peloton created SPARK-29367:
--

 Summary: pandas udf not working with latest pyarrow release 
(0.15.0)
 Key: SPARK-29367
 URL: https://issues.apache.org/jira/browse/SPARK-29367
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.4.3, 2.4.1, 2.4.0
Reporter: Julien Peloton


Hi,

I recently upgraded pyarrow from 0.14 to 0.15 (released on Oct 5th), and my 
pyspark jobs using pandas udf are failing with 
java.lang.IllegalArgumentException (tested with Spark 2.4.0, 2.4.1, and 2.4.3). 
Here is a full example to reproduce the failure with pyarrow 0.15:

{code:python}
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import BooleanType

import pandas as pd

@pandas_udf(BooleanType(), PandasUDFType.SCALAR)
def qualitycuts(nbad: int, rb: float, magdiff: float) -> pd.Series:
""" Apply simple quality cuts

Returns
--
out: pandas.Series of booleans
Return a Pandas DataFrame with the appropriate flag: false for bad alert,
and true for good alert.

"""
mask = nbad.values == 0
mask *= rb.values >= 0.55
mask *= abs(magdiff.values) <= 0.1

return pd.Series(mask)


spark = SparkSession.builder.getOrCreate()

# Create dummy DF
colnames = ["nbad", "rb", "magdiff"]
df = spark.sparkContext.parallelize(
zip(
[0, 1, 0, 0],
[0.01, 0.02, 0.6, 0.01],
[0.02, 0.05, 0.1, 0.01]
)
).toDF(colnames)

df.show()

# Apply cuts
df = df\
.withColumn("toKeep", qualitycuts(*colnames))\
.filter("toKeep == true")\
.drop("toKeep")

# This will fail if latest pyarrow 0.15.0 is used
df.show()
{code}

and the log is:

{code}
Driver stacktrace:
19/10/07 09:37:49 INFO DAGScheduler: Job 3 failed: showString at 
NativeMethodAccessorImpl.java:0, took 0.660523 s
Traceback (most recent call last):
  File "/Users/julien/Documents/workspace/myrepos/fink-broker/test_pyarrow.py", 
line 44, in 
df.show()
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
 line 378, in show
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
 line 1257, in __call__
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
 line 63, in deco
  File 
"/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
 line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o64.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 
5, localhost, executor driver): java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
at 
org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
at 
org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
at 
org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
at 
org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
at 
org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
at 
org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
at 
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
at 
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at 
org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.(ArrowEvalPythonExec.scala:98)
at 
org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96)
at 
org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127)
at 
org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at 

[jira] [Commented] (SPARK-26024) Dataset API: repartitionByRange(...) has inconsistent behaviour

2018-11-13 Thread Julien Peloton (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16685638#comment-16685638
 ] 

Julien Peloton commented on SPARK-26024:


[~cloud_fan] I would not advocate to increase the size of 
{{sampleSizePerPartition}} as well, but mentioning on the doc its impact on the 
result makes sense to me. I will submit a patch then, thanks.
 
??Why would you need a super accurate range partitioner for your (large) data 
set??

It's more about reproducibility than anything else. For a given input set of 
parameters, I want the same repartitioned output - partition size included. In 
addition, I'm working in astronomy and I am used to the low-level RDD API 
(using {{partitioners}}) which is more flexible and detailed for custom 
partitioning to my opinion. 

> Dataset API: repartitionByRange(...) has inconsistent behaviour
> ---
>
> Key: SPARK-26024
> URL: https://issues.apache.org/jira/browse/SPARK-26024
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.1, 2.3.2
> Environment: Spark version 2.3.2
>Reporter: Julien Peloton
>Priority: Major
>  Labels: dataFrame, partitioning, repartition, spark-sql
>
> Hi,
> I recently played with the {{repartitionByRange}} method for DataFrame 
> introduced in SPARK-22614. For DataFrames larger than the one tested in the 
> code (which has only 10 elements), the code sends back random results.
> As a test for showing the inconsistent behaviour, I start as the unit code 
> used to test {{repartitionByRange}} 
> ([here|https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala#L352])
>  but I increase the size of the initial array to 1000, repartition using 3 
> partitions, and count the number of element per-partitions:
>  
> {code}
> // Shuffle numbers from 0 to 1000, and make a DataFrame
> val df = Random.shuffle(0.to(1000)).toDF("val")
> // Repartition it using 3 partitions
> // Sum up number of elements in each partition, and collect it.
> // And do it several times
> for (i <- 0 to 9) {
>   var counts = df.repartitionByRange(3, col("val"))
>     .mapPartitions{part => Iterator(part.size)}
> .collect()
>   println(counts.toList)
> }
> // -> the number of elements in each partition varies...
> {code}
> I do not know whether it is expected (I will dig further in the code), but it 
> sounds like a bug.
>  Or I just misinterpret what {{repartitionByRange}} is for?
>  Any ideas?
> Thanks!
>  Julien



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26024) Dataset API: repartitionByRange(...) has inconsistent behaviour

2018-11-13 Thread Julien Peloton (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16685181#comment-16685181
 ] 

Julien Peloton commented on SPARK-26024:


Hi Marco and thanks for your quick reply.

You are absolutely right, setting a higher {{sampleSizePerPartition}} fixes the 
number of elements to be the same at all iterations.

I note however the lack of documentation on this. To my opinion, adding 
documentation appears to me quite crucial as the default 
{{sampleSizePerPartition}} is quite small (100). So in most of the real-life 
cases the same input to {{repartitionByRange}} will lead to random number of 
elements per partition.

> Dataset API: repartitionByRange(...) has inconsistent behaviour
> ---
>
> Key: SPARK-26024
> URL: https://issues.apache.org/jira/browse/SPARK-26024
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.1, 2.3.2
> Environment: Spark version 2.3.2
>Reporter: Julien Peloton
>Priority: Major
>  Labels: dataFrame, partitioning, repartition, spark-sql
>
> Hi,
> I recently played with the {{repartitionByRange}} method for DataFrame 
> introduced in SPARK-22614. For DataFrames larger than the one tested in the 
> code (which has only 10 elements), the code sends back random results.
> As a test for showing the inconsistent behaviour, I start as the unit code 
> used to test {{repartitionByRange}} 
> ([here|https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala#L352])
>  but I increase the size of the initial array to 1000, repartition using 3 
> partitions, and count the number of element per-partitions:
>  
> {code}
> // Shuffle numbers from 0 to 1000, and make a DataFrame
> val df = Random.shuffle(0.to(1000)).toDF("val")
> // Repartition it using 3 partitions
> // Sum up number of elements in each partition, and collect it.
> // And do it several times
> for (i <- 0 to 9) {
>   var counts = df.repartitionByRange(3, col("val"))
>     .mapPartitions{part => Iterator(part.size)}
> .collect()
>   println(counts.toList)
> }
> // -> the number of elements in each partition varies...
> {code}
> I do not know whether it is expected (I will dig further in the code), but it 
> sounds like a bug.
>  Or I just misinterpret what {{repartitionByRange}} is for?
>  Any ideas?
> Thanks!
>  Julien



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26024) Dataset API: repartitionByRange(...) has inconsistent behaviour

2018-11-12 Thread Julien Peloton (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Peloton updated SPARK-26024:
---
Affects Version/s: 2.3.0
   2.3.1

> Dataset API: repartitionByRange(...) has inconsistent behaviour
> ---
>
> Key: SPARK-26024
> URL: https://issues.apache.org/jira/browse/SPARK-26024
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0, 2.3.1, 2.3.2
> Environment: Spark version 2.3.2
>Reporter: Julien Peloton
>Priority: Major
>  Labels: dataFrame, partitioning, repartition, spark-sql
>
> Hi,
> I recently played with the {{repartitionByRange}} method for DataFrame 
> introduced in SPARK-22614. For DataFrames larger than the one tested in the 
> code (which has only 10 elements), the code sends back random results.
> As a test for showing the inconsistent behaviour, I start as the unit code 
> used to test {{repartitionByRange}} 
> ([here|https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala#L352])
>  but I increase the size of the initial array to 1000, repartition using 3 
> partitions, and count the number of element per-partitions:
>  
> {code}
> // Shuffle numbers from 0 to 1000, and make a DataFrame
> val df = Random.shuffle(0.to(1000)).toDF("val")
> // Repartition it using 3 partitions
> // Sum up number of elements in each partition, and collect it.
> // And do it several times
> for (i <- 0 to 9) {
>   var counts = df.repartitionByRange(3, col("val"))
>     .mapPartitions{part => Iterator(part.size)}
> .collect()
>   println(counts.toList)
> }
> // -> the number of elements in each partition varies...
> {code}
> I do not know whether it is expected (I will dig further in the code), but it 
> sounds like a bug.
>  Or I just misinterpret what {{repartitionByRange}} is for?
>  Any ideas?
> Thanks!
>  Julien



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26024) Dataset API: repartitionByRange(...) has inconsistent behaviour

2018-11-12 Thread Julien Peloton (JIRA)
Julien Peloton created SPARK-26024:
--

 Summary: Dataset API: repartitionByRange(...) has inconsistent 
behaviour
 Key: SPARK-26024
 URL: https://issues.apache.org/jira/browse/SPARK-26024
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.2
 Environment: Spark version 2.3.2
Reporter: Julien Peloton


Hi,

I recently played with the {{repartitionByRange}} method for DataFrame 
introduced in SPARK-22614. For DataFrames larger than the one tested in the 
code (which has only 10 elements), the code sends back random results.

As a test for showing the inconsistent behaviour, I start as the unit code used 
to test {{repartitionByRange}} 
([here|https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala#L352])
 but I increase the size of the initial array to 1000, repartition using 3 
partitions, and count the number of element per-partitions:

 
{code}
// Shuffle numbers from 0 to 1000, and make a DataFrame
val df = Random.shuffle(0.to(1000)).toDF("val")

// Repartition it using 3 partitions
// Sum up number of elements in each partition, and collect it.
// And do it several times
for (i <- 0 to 9) {
  var counts = df.repartitionByRange(3, col("val"))
    .mapPartitions{part => Iterator(part.size)}
.collect()
  println(counts.toList)
}
// -> the number of elements in each partition varies...
{code}
I do not know whether it is expected (I will dig further in the code), but it 
sounds like a bug.
 Or I just misinterpret what {{repartitionByRange}} is for?
 Any ideas?

Thanks!
 Julien



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org