[jira] [Commented] (SPARK-33184) spark doesn't read data source column if it is used as an index to an array under a struct

2020-10-19 Thread colin fang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17217001#comment-17217001
 ] 

colin fang commented on SPARK-33184:


I notice there is a quotation mark before `Project`. What does `!Project` mean?

> spark doesn't read data source column if it is used as an index to an array 
> under a struct
> --
>
> Key: SPARK-33184
> URL: https://issues.apache.org/jira/browse/SPARK-33184
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: colin fang
>Priority: Minor
>
> {code:python}
> df = spark.createDataFrame([[1, [[1, 2, 
> schema='x:int,y:struct>')
> df.write.mode('overwrite').parquet('test')
> {code}
> {code:python}
> # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find 
> x#720 in [y#721]"
> spark.read.parquet('test').select(F.expr('y.a[x]')).show()
> # Explain works fine, note it doesn't read x in ReadSchema
> spark.read.parquet('test').select(F.expr('y.a[x]')).explain()
> == Physical Plan ==
> *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717]
> +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, 
> Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct>>
> {code}
> The code works well if I 
> {code:python}
> # manually select the column it misses
> spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()
> # use element_at function
> spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33184) spark doesn't read data source column if it is used as an index to an array under a struct

2020-10-19 Thread colin fang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

colin fang updated SPARK-33184:
---
Issue Type: Bug  (was: Improvement)

> spark doesn't read data source column if it is used as an index to an array 
> under a struct
> --
>
> Key: SPARK-33184
> URL: https://issues.apache.org/jira/browse/SPARK-33184
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: colin fang
>Priority: Minor
>
> {code:python}
> df = spark.createDataFrame([[1, [[1, 2, 
> schema='x:int,y:struct>')
> df.write.mode('overwrite').parquet('test')
> {code}
> {code:python}
> # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find 
> x#720 in [y#721]"
> spark.read.parquet('test').select(F.expr('y.a[x]')).show()
> # Explain works fine, note it doesn't read x in ReadSchema
> spark.read.parquet('test').select(F.expr('y.a[x]')).explain()
> == Physical Plan ==
> *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717]
> +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, 
> Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct>>
> {code}
> The code works well if I 
> {code:python}
> # manually select the column it misses
> spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()
> # use element_at function
> spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33184) spark doesn't read data source column if it is used as an index to an array under a struct

2020-10-19 Thread colin fang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

colin fang updated SPARK-33184:
---
Summary: spark doesn't read data source column if it is used as an index to 
an array under a struct  (was: spark doesn't read data source column if it is 
needed as an index to an array in a nested struct)

> spark doesn't read data source column if it is used as an index to an array 
> under a struct
> --
>
> Key: SPARK-33184
> URL: https://issues.apache.org/jira/browse/SPARK-33184
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: colin fang
>Priority: Minor
>
> {code:python}
> df = spark.createDataFrame([[1, [[1, 2, 
> schema='x:int,y:struct>')
> df.write.mode('overwrite').parquet('test')
> {code}
> {code:python}
> # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find 
> x#720 in [y#721]"
> spark.read.parquet('test').select(F.expr('y.a[x]')).show()
> # Explain works fine, note it doesn't read x in ReadSchema
> spark.read.parquet('test').select(F.expr('y.a[x]')).explain()
> == Physical Plan ==
> *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717]
> +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, 
> Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct>>
> {code}
> The code works well if I 
> {code:python}
> # manually select the column it misses
> spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()
> # use element_at function
> spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33184) spark doesn't read data source column if it is needed as an index to an array in a nested struct

2020-10-19 Thread colin fang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

colin fang updated SPARK-33184:
---
Description: 
{code:python}
df = spark.createDataFrame([[1, [[1, 2, 
schema='x:int,y:struct>')
df.write.mode('overwrite').parquet('test')
{code}

{code:python}
# This causes an error "Caused by: java.lang.RuntimeException: Couldn't find 
x#720 in [y#721]"
spark.read.parquet('test').select(F.expr('y.a[x]')).show()

# Explain works fine, note it doesn't read x in ReadSchema
spark.read.parquet('test').select(F.expr('y.a[x]')).explain()

== Physical Plan ==
*(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717]
+- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, 
Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct>>
{code}


The code works well if I 

{code:python}
# manually select the column it misses
spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()

# use element_at function
spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()
{code}


  was:
{code:python}
df = spark.createDataFrame([[1, [[1, 2, 
schema='x:int,y:struct>')
df.write.mode('overwrite').parquet('test')
{code}

{code:python}
# This causes an error "Caused by: java.lang.RuntimeException: Couldn't find 
x#720 in [y#721]"
spark.read.parquet('test').select(F.expr('y.a[x]')).show()

# Explain works fine, note it doesn't read x in ReadSchema
spark.read.parquet('test').select(F.expr('y.a[x]')).explain()

== Physical Plan ==
*(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717]
+- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, 
Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct>>
{code}


The code works well if I 

- manually select the column it misses 
`spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()` 
- or use `F.element_at` function 
`spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()`




> spark doesn't read data source column if it is needed as an index to an array 
> in a nested struct
> 
>
> Key: SPARK-33184
> URL: https://issues.apache.org/jira/browse/SPARK-33184
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: colin fang
>Priority: Minor
>
> {code:python}
> df = spark.createDataFrame([[1, [[1, 2, 
> schema='x:int,y:struct>')
> df.write.mode('overwrite').parquet('test')
> {code}
> {code:python}
> # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find 
> x#720 in [y#721]"
> spark.read.parquet('test').select(F.expr('y.a[x]')).show()
> # Explain works fine, note it doesn't read x in ReadSchema
> spark.read.parquet('test').select(F.expr('y.a[x]')).explain()
> == Physical Plan ==
> *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717]
> +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, 
> Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct>>
> {code}
> The code works well if I 
> {code:python}
> # manually select the column it misses
> spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()
> # use element_at function
> spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33184) spark doesn't read data source column if it is needed as an index to an array in a nested struct

2020-10-19 Thread colin fang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

colin fang updated SPARK-33184:
---
Description: 
{code:python}
df = spark.createDataFrame([[1, [[1, 2, 
schema='x:int,y:struct>')
df.write.mode('overwrite').parquet('test')
{code}

{code:python}
# This causes an error "Caused by: java.lang.RuntimeException: Couldn't find 
x#720 in [y#721]"
spark.read.parquet('test').select(F.expr('y.a[x]')).show()

# Explain works fine, note it doesn't read x in ReadSchema
spark.read.parquet('test').select(F.expr('y.a[x]')).explain()

== Physical Plan ==
*(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717]
+- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, 
Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct>>
{code}


The code works well if I 

- manually select the column it misses 
`spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()` 
- or use `F.element_at` function 
`spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()`



  was:
```
df = spark.createDataFrame([[1, [[1, 2, 
schema='x:int,y:struct>')
df.write.mode('overwrite').parquet('test')
```

```
# This causes an error "Caused by: java.lang.RuntimeException: Couldn't find 
x#720 in [y#721]"
spark.read.parquet('test').select(F.expr('y.a[x]')).show()

# Explain works fine, note it doesn't read x in ReadSchema
spark.read.parquet('test').select(F.expr('y.a[x]')).explain()

== Physical Plan ==
*(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717]
+- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, 
Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct>>

```


The code works well if I 

- manually select the column it misses 
`spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()` 
- or use `F.element_at` function 
`spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()`



```


> spark doesn't read data source column if it is needed as an index to an array 
> in a nested struct
> 
>
> Key: SPARK-33184
> URL: https://issues.apache.org/jira/browse/SPARK-33184
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: colin fang
>Priority: Minor
>
> {code:python}
> df = spark.createDataFrame([[1, [[1, 2, 
> schema='x:int,y:struct>')
> df.write.mode('overwrite').parquet('test')
> {code}
> {code:python}
> # This causes an error "Caused by: java.lang.RuntimeException: Couldn't find 
> x#720 in [y#721]"
> spark.read.parquet('test').select(F.expr('y.a[x]')).show()
> # Explain works fine, note it doesn't read x in ReadSchema
> spark.read.parquet('test').select(F.expr('y.a[x]')).explain()
> == Physical Plan ==
> *(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717]
> +- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, 
> Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], 
> ReadSchema: struct>>
> {code}
> The code works well if I 
> - manually select the column it misses 
> `spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()` 
> - or use `F.element_at` function 
> `spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 
> 1)).show()`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33184) spark doesn't read data source column if it is needed as an index to an array in a nested struct

2020-10-19 Thread colin fang (Jira)
colin fang created SPARK-33184:
--

 Summary: spark doesn't read data source column if it is needed as 
an index to an array in a nested struct
 Key: SPARK-33184
 URL: https://issues.apache.org/jira/browse/SPARK-33184
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 3.0.0
Reporter: colin fang


```
df = spark.createDataFrame([[1, [[1, 2, 
schema='x:int,y:struct>')
df.write.mode('overwrite').parquet('test')
```

```
# This causes an error "Caused by: java.lang.RuntimeException: Couldn't find 
x#720 in [y#721]"
spark.read.parquet('test').select(F.expr('y.a[x]')).show()

# Explain works fine, note it doesn't read x in ReadSchema
spark.read.parquet('test').select(F.expr('y.a[x]')).explain()

== Physical Plan ==
*(1) !Project [y#713.a[x#712] AS y.a AS `a`[x]#717]
+- FileScan parquet [y#713] Batched: false, DataFilters: [], Format: Parquet, 
Location: InMemoryFileIndex, PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct>>

```


The code works well if I 

- manually select the column it misses 
`spark.read.parquet('test').select(F.expr('y.a[x]'), F.col('x')).show()` 
- or use `F.element_at` function 
`spark.read.parquet('test').select(F.element_at('y.a', F.col('x') + 1)).show()`



```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28148) repartition after join is not optimized away

2019-06-24 Thread colin fang (JIRA)
colin fang created SPARK-28148:
--

 Summary: repartition after join is not optimized away
 Key: SPARK-28148
 URL: https://issues.apache.org/jira/browse/SPARK-28148
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.3
Reporter: colin fang


Partitioning & sorting is usually retained after join.

{code}
spark.conf.set('spark.sql.shuffle.partitions', '42')

df1 = spark.range(500, numPartitions=5)
df2 = spark.range(1000, numPartitions=5)
df3 = spark.range(2000, numPartitions=5)

# Reuse previous partitions & sort.
df1.join(df2, on='id').join(df3, on='id').explain()
# == Physical Plan ==
# *(8) Project [id#367L]
# +- *(8) SortMergeJoin [id#367L], [id#374L], Inner
#:- *(5) Project [id#367L]
#:  +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
#: :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
#: :  +- Exchange hashpartitioning(id#367L, 42)
#: : +- *(1) Range (0, 500, step=1, splits=5)
#: +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
#:+- Exchange hashpartitioning(id#369L, 42)
#:   +- *(3) Range (0, 1000, step=1, splits=5)
#+- *(7) Sort [id#374L ASC NULLS FIRST], false, 0
#   +- Exchange hashpartitioning(id#374L, 42)
#  +- *(6) Range (0, 2000, step=1, splits=5)

{code}

However here:  Partitions persist through left join, sort doesn't.

{code}
df1.join(df2, on='id', 
how='left').repartition('id').sortWithinPartitions('id').explain()
# == Physical Plan ==
# *(5) Sort [id#367L ASC NULLS FIRST], false, 0
# +- *(5) Project [id#367L]
#+- SortMergeJoin [id#367L], [id#369L], LeftOuter
#   :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
#   :  +- Exchange hashpartitioning(id#367L, 42)
#   : +- *(1) Range (0, 500, step=1, splits=5)
#   +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
#  +- Exchange hashpartitioning(id#369L, 42)
# +- *(3) Range (0, 1000, step=1, splits=5)
{code}

Also here: Partitions do not persist though inner join.


{code}
df1.join(df2, on='id').repartition('id').sortWithinPartitions('id').explain()
# == Physical Plan ==
# *(6) Sort [id#367L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#367L, 42)
#+- *(5) Project [id#367L]
#   +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
#  :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
#  :  +- Exchange hashpartitioning(id#367L, 42)
#  : +- *(1) Range (0, 500, step=1, splits=5)
#  +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#369L, 42)
#+- *(3) Range (0, 1000, step=1, splits=5)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27759) Do not auto cast array to np.array in vectorized udf

2019-06-11 Thread colin fang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

colin fang updated SPARK-27759:
---
Description: 
{code:java}
pd_df = pd.DataFrame({'x': np.random.rand(11, 3, 5).tolist()})
df = spark.createDataFrame(pd_df).cache()
{code}
Each element in x is a list of list, as expected.
{code:java}
df.toPandas()['x']

# 0 [[0.08669612955959993, 0.32624430522634495, 0 
# 1 [[0.29838166086156914,  0.008550172904516762, 0... 
# 2 [[0.641304534802928, 0.2392047548381877, 0.555...
{code}
 
{code:java}
def my_udf(x):
# Hack to see what's inside a udf
raise Exception(x.values.shape, x.values[0].shape, x.values[0][0].shape, 
np.stack(x.values).shape)
return pd.Series(x.values)

my_udf = F.pandas_udf(my_udf, returnType=DoubleType())
df.coalesce(1).withColumn('y', my_udf('x')).show(

# Exception: ((11,), (3,), (5,), (11, 3)){code}
 

A batch (11) of `x` is converted to pd.Series, however, each element in the 
pd.Series is now a numpy 1d array of numpy 1d array. It is inconvenient to work 
with nested 1d numpy array in practice in a udf.

 

For example, I need a ndarray of shape (11, 3, 5) in udf, so that I can make 
use of the numpy vectorized operations. If I was given a list of list intact, I 
can simply do `np.stack(x.values)`. However, it doesn't work here as what I 
received is a nested numpy 1d array.

 

 

  was:
{code:java}
pd_df = pd.DataFrame(\{'x': np.random.rand(11, 3, 5).tolist()})
df = spark.createDataFrame(pd_df).cache()
{code}
Each element in x is a list of list, as expected.
{code:java}
df.toPandas()['x']

# 0 [[0.08669612955959993, 0.32624430522634495, 0 
# 1 [[0.29838166086156914,  0.008550172904516762, 0... 
# 2 [[0.641304534802928, 0.2392047548381877, 0.555...
{code}
 
{code:java}
def my_udf(x):
# Hack to see what's inside a udf
raise Exception(x.values.shape, x.values[0].shape, x.values[0][0].shape, 
np.stack(x.values).shape)
return pd.Series(x.values)

my_udf = pandas_udf(dot_product, returnType=DoubleType())
df.withColumn('y', my_udf('x')).show()

Exception: ((2,), (3,), (5,), (2, 3))
{code}
 

A batch (2) of `x` is converted to pd.Series, however, each element in the 
pd.Series is now a numpy 1d array of numpy 1d array. It is inconvenient to work 
with nested 1d numpy array in practice in a udf.

 

For example, I need a ndarray of shape (2, 3, 5) in udf, so that I can make use 
of the numpy vectorized operations. If I was given a list of list intact, I can 
simply do `np.stack(x.values)`. However, it doesn't work here as what I 
received is a nested numpy 1d array.

 

 


> Do not auto cast array to np.array in vectorized udf
> 
>
> Key: SPARK-27759
> URL: https://issues.apache.org/jira/browse/SPARK-27759
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: colin fang
>Priority: Minor
>
> {code:java}
> pd_df = pd.DataFrame({'x': np.random.rand(11, 3, 5).tolist()})
> df = spark.createDataFrame(pd_df).cache()
> {code}
> Each element in x is a list of list, as expected.
> {code:java}
> df.toPandas()['x']
> # 0 [[0.08669612955959993, 0.32624430522634495, 0 
> # 1 [[0.29838166086156914,  0.008550172904516762, 0... 
> # 2 [[0.641304534802928, 0.2392047548381877, 0.555...
> {code}
>  
> {code:java}
> def my_udf(x):
> # Hack to see what's inside a udf
> raise Exception(x.values.shape, x.values[0].shape, x.values[0][0].shape, 
> np.stack(x.values).shape)
> return pd.Series(x.values)
> my_udf = F.pandas_udf(my_udf, returnType=DoubleType())
> df.coalesce(1).withColumn('y', my_udf('x')).show(
> # Exception: ((11,), (3,), (5,), (11, 3)){code}
>  
> A batch (11) of `x` is converted to pd.Series, however, each element in the 
> pd.Series is now a numpy 1d array of numpy 1d array. It is inconvenient to 
> work with nested 1d numpy array in practice in a udf.
>  
> For example, I need a ndarray of shape (11, 3, 5) in udf, so that I can make 
> use of the numpy vectorized operations. If I was given a list of list intact, 
> I can simply do `np.stack(x.values)`. However, it doesn't work here as what I 
> received is a nested numpy 1d array.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27759) Do not auto cast array to np.array in vectorized udf

2019-05-17 Thread colin fang (JIRA)
colin fang created SPARK-27759:
--

 Summary: Do not auto cast array to np.array in vectorized 
udf
 Key: SPARK-27759
 URL: https://issues.apache.org/jira/browse/SPARK-27759
 Project: Spark
  Issue Type: Improvement
  Components: PySpark, SQL
Affects Versions: 2.4.3
Reporter: colin fang


{code:java}
pd_df = pd.DataFrame(\{'x': np.random.rand(11, 3, 5).tolist()})
df = spark.createDataFrame(pd_df).cache()
{code}
Each element in x is a list of list, as expected.
{code:java}
df.toPandas()['x']

# 0 [[0.08669612955959993, 0.32624430522634495, 0 
# 1 [[0.29838166086156914,  0.008550172904516762, 0... 
# 2 [[0.641304534802928, 0.2392047548381877, 0.555...
{code}
 
{code:java}
def my_udf(x):
# Hack to see what's inside a udf
raise Exception(x.values.shape, x.values[0].shape, x.values[0][0].shape, 
np.stack(x.values).shape)
return pd.Series(x.values)

my_udf = pandas_udf(dot_product, returnType=DoubleType())
df.withColumn('y', my_udf('x')).show()

Exception: ((2,), (3,), (5,), (2, 3))
{code}
 

A batch (2) of `x` is converted to pd.Series, however, each element in the 
pd.Series is now a numpy 1d array of numpy 1d array. It is inconvenient to work 
with nested 1d numpy array in practice in a udf.

 

For example, I need a ndarray of shape (2, 3, 5) in udf, so that I can make use 
of the numpy vectorized operations. If I was given a list of list intact, I can 
simply do `np.stack(x.values)`. However, it doesn't work here as what I 
received is a nested numpy 1d array.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17859) persist should not impede with spark's ability to perform a broadcast join.

2019-04-30 Thread colin fang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830504#comment-16830504
 ] 

colin fang commented on SPARK-17859:


The above case works for me in v2.4
{code:java}
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)
df_large = spark.range(1e6)
df_small = F.broadcast(spark.range(10).coalesce(1)).cache()
df_large.join(df_small, "id").explain()


== Physical Plan ==
*(2) Project [id#0L]
+- *(2) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight
   :- *(2) Range (0, 100, step=1, splits=4)
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
false]))
  +- *(1) InMemoryTableScan [id#2L]
+- InMemoryRelation [id#2L], StorageLevel(disk, memory, 
deserialized, 1 replicas)
  +- Coalesce 1
 +- *(1) Range (0, 10, step=1, splits=4)
{code}
However, I have definitely seen cases where `F.broadcast` is ignored for cached 
dataframe. (I am unable to find a minimal example though.)

> persist should not impede with spark's ability to perform a broadcast join.
> ---
>
> Key: SPARK-17859
> URL: https://issues.apache.org/jira/browse/SPARK-17859
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.0.0
> Environment: spark 2.0.0 , Linux RedHat
>Reporter: Franck Tago
>Priority: Major
>
> I am using Spark 2.0.0 
> My investigation leads me to conclude that calling persist could prevent 
> broadcast join  from happening .
> Example
> Case1: No persist call 
> var  df1 =spark.range(100).select($"id".as("id1"))
> df1: org.apache.spark.sql.DataFrame = [id1: bigint]
>  var df2 =spark.range(1000).select($"id".as("id2"))
> df2: org.apache.spark.sql.DataFrame = [id2: bigint]
>  df1.join(df2 , $"id1" === $"id2" ).explain 
> == Physical Plan ==
> *BroadcastHashJoin [id1#117L], [id2#123L], Inner, BuildRight
> :- *Project [id#114L AS id1#117L]
> :  +- *Range (0, 100, splits=2)
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> false]))
>+- *Project [id#120L AS id2#123L]
>   +- *Range (0, 1000, splits=2)
> Case 2:  persist call 
>  df1.persist.join(df2 , $"id1" === $"id2" ).explain 
> 16/10/10 15:50:21 WARN CacheManager: Asked to cache already cached data.
> == Physical Plan ==
> *SortMergeJoin [id1#3L], [id2#9L], Inner
> :- *Sort [id1#3L ASC], false, 0
> :  +- Exchange hashpartitioning(id1#3L, 10)
> : +- InMemoryTableScan [id1#3L]
> ::  +- InMemoryRelation [id1#3L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> :: :  +- *Project [id#0L AS id1#3L]
> :: : +- *Range (0, 100, splits=2)
> +- *Sort [id2#9L ASC], false, 0
>+- Exchange hashpartitioning(id2#9L, 10)
>   +- InMemoryTableScan [id2#9L]
>  :  +- InMemoryRelation [id2#9L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>  : :  +- *Project [id#6L AS id2#9L]
>  : : +- *Range (0, 1000, splits=2)
> Why does the persist call prevent the broadcast join . 
> My opinion is that it should not .
> I was made aware that the persist call is  lazy and that might have something 
> to do with it , but I still contend that it should not . 
> Losing broadcast joins is really costly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27559) Nullable in a given schema is not respected when reading from parquet

2019-04-24 Thread colin fang (JIRA)
colin fang created SPARK-27559:
--

 Summary: Nullable in a given schema is not respected when reading 
from parquet
 Key: SPARK-27559
 URL: https://issues.apache.org/jira/browse/SPARK-27559
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.4.2
Reporter: colin fang


Even if I specify a schema when reading from parquet, nullable is not reset.

{code:java}
spark.range(10, numPartitions=1).write.mode('overwrite').parquet('tmp')
df1 = spark.read.parquet('tmp')
df1.printSchema()
# root
#  |-- id: long (nullable = true)
df2 = spark.read.schema(StructType([StructField('id', LongType(), 
False)])).parquet('tmp')
df2.printSchema()
# root
#  |-- x: long (nullable = true)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27217) Nested schema pruning doesn't work for aggregation e.g. `sum`.

2019-03-20 Thread colin fang (JIRA)
colin fang created SPARK-27217:
--

 Summary: Nested schema pruning doesn't work for aggregation e.g. 
`sum`.
 Key: SPARK-27217
 URL: https://issues.apache.org/jira/browse/SPARK-27217
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: colin fang


Since SPARK-4502 is fixed,  I would expect queries such as `select sum(b.x)` 
doesn't have to read other nested fields.

{code:python}   
 rdd = spark.range(1000).rdd.map(lambda x: [x.id+3, [x.id+1, x.id-1]])
df = spark.createDataFrame(, schema='a:int,b:struct')
df.repartition(1).write.mode('overwrite').parquet('test.parquet')
df = spark.read.parquet('test.parquet')

spark.conf.set('spark.sql.optimizer.nestedSchemaPruning.enabled', 'true')
df.select('b.x').explain()
# ReadSchema: struct>

spark.conf.set('spark.sql.optimizer.nestedSchemaPruning.enabled', 'false')
df.select('b.x').explain()
# ReadSchema: struct>

spark.conf.set('spark.sql.optimizer.nestedSchemaPruning.enabled', 'true')
df.selectExpr('sum(b.x)').explain()
#  ReadSchema: struct>
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org