[jira] [Commented] (SPARK-28269) ArrowStreamPandasSerializer get stack

2019-07-13 Thread Modi Tamam (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16884330#comment-16884330
 ] 

Modi Tamam commented on SPARK-28269:


[~hyukjin.kwon] I think that your diagnose is wrong and you haven't reached the 
problematic action.

The problem is on this row:
{code:java}
full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
{code}
 

And it seems like you haven't reached it.

 

> ArrowStreamPandasSerializer get stack
> -
>
> Key: SPARK-28269
> URL: https://issues.apache.org/jira/browse/SPARK-28269
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Modi Tamam
>Priority: Major
> Attachments: Untitled.xcf
>
>
> I'm working with Pyspark version 2.4.3.
> I have a big data frame:
>  * ~15M rows
>  * ~130 columns
>  * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
> (pandas_df.toPickle() ) resulted with a file of size 2.5GB.
> I have some code that groups this data frame and applying a Pandas-UDF:
>  
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
> from pyspark.sql.types import *
> from pyspark.sql import functions as F
> initial_list = range(4500)
> rdd = sc.parallelize(initial_list)
> rdd = rdd.map(lambda x: Row(val=x))
> initial_spark_df = spark.createDataFrame(rdd)
> cols_count = 132
> rows = 1000
> # --- Start Generating the big data frame---
> # Generating the schema
> schema = StructType([StructField(str(i), IntegerType()) for i in 
> range(cols_count)])
> @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP)
> def random_pd_df_generator(df):
> import numpy as np
> import pandas as pd
> return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), 
> columns=range(cols_count))
> full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator)
> # --- End Generating the big data frame---
> # ---Start the bug reproduction---
> grouped_col = "col_0"
> @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP)
> def very_simpl_udf(pdf):
> import pandas as pd
> ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]})
> return ret_val
> # In order to create a huge dataset, I've set all of the grouped_col value to 
> a single value, then, grouped it into a single dataset.
> # Here is where to program gets stuck
> full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
> assert False, "If we're, means that the issue wasn't reproduced"
> {code}
>  
> The above code gets stuck on the ArrowStreamPandasSerializer: (on the first 
> line when reading batch from the reader)
>  
> {code:java}
> for batch in reader:
>  yield [self.arrow_to_pandas(c) for c in  
> pa.Table.from_batches([batch]).itercolumns()]{code}
>  
>  You can just run the first code snippet and it will reproduce.
> Open a Pyspark shell with this configuration:
> {code:java}
> pyspark --conf "spark.python.worker.memory=3G" --conf 
> "spark.executor.memory=20G" --conf 
> "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf 
> "spark.driver.memory=10G"{code}
>  
> Versions:
>  * pandas - 0.24.2
>  * pyarrow - 0.13.0
>  * Spark - 2.4.2
>  * Python - 2.7.16



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28269) ArrowStreamPandasSerializer get stack

2019-07-11 Thread Modi Tamam (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882846#comment-16882846
 ] 

Modi Tamam commented on SPARK-28269:


[~hyukjin.kwon] Sorry, but it doesn't work on my machie, can you please double 
check? maybe packages versions?

> ArrowStreamPandasSerializer get stack
> -
>
> Key: SPARK-28269
> URL: https://issues.apache.org/jira/browse/SPARK-28269
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Modi Tamam
>Priority: Major
> Attachments: Untitled.xcf
>
>
> I'm working with Pyspark version 2.4.3.
> I have a big data frame:
>  * ~15M rows
>  * ~130 columns
>  * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
> (pandas_df.toPickle() ) resulted with a file of size 2.5GB.
> I have some code that groups this data frame and applying a Pandas-UDF:
>  
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
> from pyspark.sql.types import *
> from pyspark.sql import functions as F
> initial_list = range(4500)
> rdd = sc.parallelize(initial_list)
> rdd = rdd.map(lambda x: Row(val=x))
> initial_spark_df = spark.createDataFrame(rdd)
> cols_count = 132
> rows = 1000
> # --- Start Generating the big data frame---
> # Generating the schema
> schema = StructType([StructField(str(i), IntegerType()) for i in 
> range(cols_count)])
> @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP)
> def random_pd_df_generator(df):
> import numpy as np
> import pandas as pd
> return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), 
> columns=range(cols_count))
> full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator)
> # --- End Generating the big data frame---
> # ---Start the bug reproduction---
> grouped_col = "col_0"
> @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP)
> def very_simpl_udf(pdf):
> import pandas as pd
> ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]})
> return ret_val
> # In order to create a huge dataset, I've set all of the grouped_col value to 
> a single value, then, grouped it into a single dataset.
> # Here is where to program gets stuck
> full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
> assert False, "If we're, means that the issue wasn't reproduced"
> {code}
>  
> The above code gets stuck on the ArrowStreamPandasSerializer: (on the first 
> line when reading batch from the reader)
>  
> {code:java}
> for batch in reader:
>  yield [self.arrow_to_pandas(c) for c in  
> pa.Table.from_batches([batch]).itercolumns()]{code}
>  
>  You can just run the first code snippet and it will reproduce.
> Open a Pyspark shell with this configuration:
> {code:java}
> pyspark --conf "spark.python.worker.memory=3G" --conf 
> "spark.executor.memory=20G" --conf 
> "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf 
> "spark.driver.memory=10G"{code}
>  
> Versions:
>  * pandas - 0.24.2
>  * pyarrow - 0.13.0
>  * Spark - 2.4.2
>  * Python - 2.7.16



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28269) ArrowStreamPandasSerializer get stack

2019-07-08 Thread Modi Tamam (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880942#comment-16880942
 ] 

Modi Tamam commented on SPARK-28269:


Sorry, I had a typo, not stack but *stuck*.

There shouldn't be any exception, the program should never end and never get to 
the assert at the last line.

For reproduction, I used r4.2xlarge machine on AWS (8 cores, ~60 G RAM).

When I'm attaching a debugger to the pyspark.daemon, I can see that the process 
is stuck inside the pyspark/serializers.py in the load_stream method - line 303:
{code:java}
for batch in reader:{code}
 

> ArrowStreamPandasSerializer get stack
> -
>
> Key: SPARK-28269
> URL: https://issues.apache.org/jira/browse/SPARK-28269
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Modi Tamam
>Priority: Major
> Attachments: Untitled.xcf
>
>
> I'm working with Pyspark version 2.4.3.
> I have a big data frame:
>  * ~15M rows
>  * ~130 columns
>  * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
> (pandas_df.toPickle() ) resulted with a file of size 2.5GB.
> I have some code that groups this data frame and applying a Pandas-UDF:
>  
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
> from pyspark.sql.types import *
> from pyspark.sql import functions as F
> initial_list = range(4500)
> rdd = sc.parallelize(initial_list)
> rdd = rdd.map(lambda x: Row(val=x))
> initial_spark_df = spark.createDataFrame(rdd)
> cols_count = 132
> rows = 1000
> # --- Start Generating the big data frame---
> # Generating the schema
> schema = StructType([StructField(str(i), IntegerType()) for i in 
> range(cols_count)])
> @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP)
> def random_pd_df_generator(df):
> import numpy as np
> import pandas as pd
> return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), 
> columns=range(cols_count))
> full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator)
> # --- End Generating the big data frame---
> # ---Start the bug reproduction---
> grouped_col = "col_0"
> @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP)
> def very_simpl_udf(pdf):
> import pandas as pd
> ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]})
> return ret_val
> # In order to create a huge dataset, I've set all of the grouped_col value to 
> a single value, then, grouped it into a single dataset.
> # Here is where to program gets stuck
> full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
> assert False, "If we're, means that the issue wasn't reproduced"
> {code}
>  
> The above code gets stacked on the ArrowStreamPandasSerializer: (on the first 
> line when reading batch from the reader)
>  
> {code:java}
> for batch in reader:
>  yield [self.arrow_to_pandas(c) for c in  
> pa.Table.from_batches([batch]).itercolumns()]{code}
>  
>  You can just run the first code snippet and it will reproduce.
> Open a Pyspark shell with this configuration:
> {code:java}
> pyspark --conf "spark.python.worker.memory=3G" --conf 
> "spark.executor.memory=20G" --conf 
> "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf 
> "spark.driver.memory=10G"{code}
>  
> Versions:
>  * pandas - 0.24.2
>  * pyarrow - 0.13.0
>  * Spark - 2.4.2
>  * Python - 2.7.16



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28269) ArrowStreamPandasSerializer get stack

2019-07-08 Thread Modi Tamam (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880216#comment-16880216
 ] 

Modi Tamam commented on SPARK-28269:


[~hyukjin.kwon] I've updated the message with simple code so you can reproduce 
easily.

In addition, there is no stacktrace since the process just stucks on the 
serialization.

 

> ArrowStreamPandasSerializer get stack
> -
>
> Key: SPARK-28269
> URL: https://issues.apache.org/jira/browse/SPARK-28269
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Modi Tamam
>Priority: Major
> Attachments: Untitled.xcf
>
>
> I'm working with Pyspark version 2.4.3.
> I have a big data frame:
>  * ~15M rows
>  * ~130 columns
>  * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
> (pandas_df.toPickle() ) resulted with a file of size 2.5GB.
> I have some code that groups this data frame and applying a Pandas-UDF:
>  
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
> from pyspark.sql.types import *
> from pyspark.sql import functions as F
> initial_list = range(4500)
> rdd = sc.parallelize(initial_list)
> rdd = rdd.map(lambda x: Row(val=x))
> initial_spark_df = spark.createDataFrame(rdd)
> cols_count = 132
> rows = 1000
> # --- Start Generating the big data frame---
> # Generating the schema
> schema = StructType([StructField(str(i), IntegerType()) for i in 
> range(cols_count)])
> @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP)
> def random_pd_df_generator(df):
> import numpy as np
> import pandas as pd
> return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), 
> columns=range(cols_count))
> full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator)
> # --- End Generating the big data frame---
> # ---Start the bug reproduction---
> grouped_col = "col_0"
> @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP)
> def very_simpl_udf(pdf):
> import pandas as pd
> ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]})
> return ret_val
> # In order to create a huge dataset, I've set all of the grouped_col value to 
> a single value, then, grouped it into a single dataset.
> # Here is where to program gets stuck
> full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
> assert False, "If we're, means that the issue wasn't reproduced"
> {code}
>  
> The above code gets stacked on the ArrowStreamPandasSerializer: (on the first 
> line when reading batch from the reader)
>  
> {code:java}
> for batch in reader:
>  yield [self.arrow_to_pandas(c) for c in  
> pa.Table.from_batches([batch]).itercolumns()]{code}
>  
>  You can just run the first code snippet and it will reproduce.
> Open a Pyspark shell with this configuration:
> {code:java}
> pyspark --conf "spark.python.worker.memory=3G" --conf 
> "spark.executor.memory=20G" --conf 
> "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf 
> "spark.driver.memory=10G"{code}
>  
> Versions:
>  * pandas - 0.24.2
>  * pyarrow - 0.13.0
>  * Spark - 2.4.2
>  * Python - 2.7.16



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28269) ArrowStreamPandasSerializer get stack

2019-07-08 Thread Modi Tamam (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Modi Tamam updated SPARK-28269:
---
Description: 
I'm working with Pyspark version 2.4.3.

I have a big data frame:
 * ~15M rows
 * ~130 columns
 * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
(pandas_df.toPickle() ) resulted with a file of size 2.5GB.

I have some code that groups this data frame and applying a Pandas-UDF:

 
{code:java}
from pyspark.sql import Row
from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
from pyspark.sql.types import *
from pyspark.sql import functions as F

initial_list = range(4500)
rdd = sc.parallelize(initial_list)
rdd = rdd.map(lambda x: Row(val=x))
initial_spark_df = spark.createDataFrame(rdd)

cols_count = 132
rows = 1000

# --- Start Generating the big data frame---
# Generating the schema
schema = StructType([StructField(str(i), IntegerType()) for i in 
range(cols_count)])

@pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP)
def random_pd_df_generator(df):
import numpy as np
import pandas as pd
return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), 
columns=range(cols_count))


full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator)
# --- End Generating the big data frame---

# ---Start the bug reproduction---
grouped_col = "col_0"

@pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP)
def very_simpl_udf(pdf):
import pandas as pd
ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]})
return ret_val

# In order to create a huge dataset, I've set all of the grouped_col value to a 
single value, then, grouped it into a single dataset.
# Here is where to program gets stuck
full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
assert False, "If we're, means that the issue wasn't reproduced"

{code}
 

The above code gets stacked on the ArrowStreamPandasSerializer: (on the first 
line when reading batch from the reader)

 
{code:java}
for batch in reader:
 yield [self.arrow_to_pandas(c) for c in  
pa.Table.from_batches([batch]).itercolumns()]{code}
 

 You can just run the first code snippet and it will reproduce.

Open a Pyspark shell with this configuration:
{code:java}
pyspark --conf "spark.python.worker.memory=3G" --conf 
"spark.executor.memory=20G" --conf 
"spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf 
"spark.driver.memory=10G"{code}
 

Versions:
 * pandas - 0.24.2
 * pyarrow - 0.13.0
 * Spark - 2.4.2
 * Python - 2.7.16

  was:
I'm working with Pyspark version 2.4.3.

I have a big data frame:
 * ~15M rows
 * ~130 columns
 * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
(pandas_df.toPickle() ) resulted with a file of size 2.5GB.

I have some code that groups this data frame and applying a Pandas-UDF:

 
{code:java}
from pyspark.sql import Row
from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
from pyspark.sql.types import *
from pyspark.sql import functions as F

initial_list = range(4500)
rdd = sc.parallelize(initial_list)
rdd = rdd.map(lambda x: Row(val=x))
initial_spark_df = spark.createDataFrame(rdd)

cols_count = 132
rows = 1000

# --- Start Generating the big data frame---
# Generating the schema
schema = StructType([StructField(str(i), IntegerType()) for i in 
range(cols_count)])

@pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP)
def random_pd_df_generator(df):
import numpy as np
import pandas as pd
return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), 
columns=range(cols_count))


full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator)
# --- End Generating the big data frame---

# ---Start the bug reproduction---
grouped_col = "col_0"

@pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP)
def very_simpl_udf(pdf):
import pandas as pd
ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]})
return ret_val

# In order to create a huge dataset, I've set all of the grouped_col value to a 
single value, then, grouped it into a single dataset.
# Here is where to program gets stuck
full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
assert False, "If we're, means that the issue wasn't reproduced"

{code}
 

The above code gets stacked on the ArrowStreamPandasSerializer: (on the first 
line when reading batch from the reader)

 
{code:java}
for batch in reader:
 yield [self.arrow_to_pandas(c) for c in  
pa.Table.from_batches([batch]).itercolumns()]{code}
 

 You can just run the first code snipp

[jira] [Updated] (SPARK-28269) ArrowStreamPandasSerializer get stack

2019-07-08 Thread Modi Tamam (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Modi Tamam updated SPARK-28269:
---
Description: 
I'm working with Pyspark version 2.4.3.

I have a big data frame:
 * ~15M rows
 * ~130 columns
 * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
(pandas_df.toPickle() ) resulted with a file of size 2.5GB.

I have some code that groups this data frame and applying a Pandas-UDF:

 
{code:java}
from pyspark.sql import Row
from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
from pyspark.sql.types import *
from pyspark.sql import functions as F

initial_list = range(4500)
rdd = sc.parallelize(initial_list)
rdd = rdd.map(lambda x: Row(val=x))
initial_spark_df = spark.createDataFrame(rdd)

cols_count = 132
rows = 1000

# --- Start Generating the big data frame---
# Generating the schema
schema = StructType([StructField(str(i), IntegerType()) for i in 
range(cols_count)])

@pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP)
def random_pd_df_generator(df):
import numpy as np
import pandas as pd
return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), 
columns=range(cols_count))


full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator)
# --- End Generating the big data frame---

# ---Start the bug reproduction---
grouped_col = "col_0"

@pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP)
def very_simpl_udf(pdf):
import pandas as pd
ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]})
return ret_val

# In order to create a huge dataset, I've set all of the grouped_col value to a 
single value, then, grouped it into a single dataset.
# Here is where to program gets stuck
full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show()
assert False, "If we're, means that the issue wasn't reproduced"

{code}
 

The above code gets stacked on the ArrowStreamPandasSerializer: (on the first 
line when reading batch from the reader)

 
{code:java}
for batch in reader:
 yield [self.arrow_to_pandas(c) for c in  
pa.Table.from_batches([batch]).itercolumns()]{code}
 

 You can just run the first code snippet and it will reproduce.

Open a Pyspark shell with this configuration:
{code:java}
pyspark --conf "spark.python.worker.memory=3G" --conf 
"spark.executor.memory=20G" --conf 
"spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf 
"spark.driver.memory=10G"{code}
 

Versions:
 * pandas - 0.24.2
 * pyarrow - 0.13.0
 * Spark - 2.4.2

  was:
I'm working with Pyspark version 2.4.3.

I have a big data frame:
 * ~15M rows
 * ~130 columns
 * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
(pandas_df.toPickle() ) resulted with a file of size 2.5GB.

I have some code that groups this data frame and applying a Pandas-UDF:

 
{code:java}
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as F
import pyarrow.parquet as pq
import pyarrow as pa
non_issued_patch="31.7996378000_35.2114362000"
issued_patch = "31.7995787833_35.2121463045"

@pandas_udf("patch_name string", PandasUDFType.GROUPED_MAP)
def foo(pdf):
 import pandas as pd
 ret_val = pd.DataFrame({'patch_name': [pdf['patch_name'].iloc[0]]})
 return ret_val

full_df=spark.read.parquet('debug-mega-patch')
df = full_df.filter(F.col("grouping_column") == issued_patch).cache()

df.groupBy("grouping_column").apply(foo).repartition(1).write.mode('overwrite').parquet('debug-df/')
 
{code}
 

The above code gets stacked on the ArrowStreamPandasSerializer: (on the first 
line when reading batch from the reader)

 
{code:java}
for batch in reader:
 yield [self.arrow_to_pandas(c) for c in  
pa.Table.from_batches([batch]).itercolumns()]{code}
 

 

 


> ArrowStreamPandasSerializer get stack
> -
>
> Key: SPARK-28269
> URL: https://issues.apache.org/jira/browse/SPARK-28269
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Modi Tamam
>Priority: Major
> Attachments: Untitled.xcf
>
>
> I'm working with Pyspark version 2.4.3.
> I have a big data frame:
>  * ~15M rows
>  * ~130 columns
>  * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
> (pandas_df.toPickle() ) resulted with a file of size 2.5GB.
> I have some code that groups this data frame and applying a Pandas-UDF:
>  
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json
> from pyspark.sql.types import *
> from pyspark.sql import functions as F
> initial_list = range(4500)
> rdd = sc.parallelize(initial_list)
> rdd = r

[jira] [Updated] (SPARK-28269) ArrowStreamPandasSerializer get stack

2019-07-07 Thread Modi Tamam (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Modi Tamam updated SPARK-28269:
---
Attachment: Untitled.xcf

> ArrowStreamPandasSerializer get stack
> -
>
> Key: SPARK-28269
> URL: https://issues.apache.org/jira/browse/SPARK-28269
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Modi Tamam
>Priority: Major
> Attachments: Untitled.xcf
>
>
> I'm working with Pyspark version 2.4.3.
> I have a big data frame:
>  * ~15M rows
>  * ~130 columns
>  * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
> (pandas_df.toPickle() ) resulted with a file of size 2.5GB.
> I have some code that groups this data frame and applying a Pandas-UDF:
>  
> {code:java}
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> from pyspark.sql import functions as F
> import pyarrow.parquet as pq
> import pyarrow as pa
> non_issued_patch="31.7996378000_35.2114362000"
> issued_patch = "31.7995787833_35.2121463045"
> @pandas_udf("patch_name string", PandasUDFType.GROUPED_MAP)
> def foo(pdf):
>  import pandas as pd
>  ret_val = pd.DataFrame({'patch_name': [pdf['patch_name'].iloc[0]]})
>  return ret_val
> full_df=spark.read.parquet('debug-mega-patch')
> df = full_df.filter(F.col("grouping_column") == issued_patch).cache()
> df.groupBy("grouping_column").apply(foo).repartition(1).write.mode('overwrite').parquet('debug-df/')
>  
> {code}
>  
> The above code gets stacked on the ArrowStreamPandasSerializer: (on the first 
> line when reading batch from the reader)
>  
> {code:java}
> for batch in reader:
>  yield [self.arrow_to_pandas(c) for c in  
> pa.Table.from_batches([batch]).itercolumns()]{code}
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-28269) ArrowStreamPandasSerializer get stack

2019-07-07 Thread Modi Tamam (JIRA)
Modi Tamam created SPARK-28269:
--

 Summary: ArrowStreamPandasSerializer get stack
 Key: SPARK-28269
 URL: https://issues.apache.org/jira/browse/SPARK-28269
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.4.3
Reporter: Modi Tamam


I'm working with Pyspark version 2.4.3.

I have a big data frame:
 * ~15M rows
 * ~130 columns
 * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it 
(pandas_df.toPickle() ) resulted with a file of size 2.5GB.

I have some code that groups this data frame and applying a Pandas-UDF:

 
{code:java}
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as F
import pyarrow.parquet as pq
import pyarrow as pa
non_issued_patch="31.7996378000_35.2114362000"
issued_patch = "31.7995787833_35.2121463045"

@pandas_udf("patch_name string", PandasUDFType.GROUPED_MAP)
def foo(pdf):
 import pandas as pd
 ret_val = pd.DataFrame({'patch_name': [pdf['patch_name'].iloc[0]]})
 return ret_val

full_df=spark.read.parquet('debug-mega-patch')
df = full_df.filter(F.col("grouping_column") == issued_patch).cache()

df.groupBy("grouping_column").apply(foo).repartition(1).write.mode('overwrite').parquet('debug-df/')
 
{code}
 

The above code gets stacked on the ArrowStreamPandasSerializer: (on the first 
line when reading batch from the reader)

 
{code:java}
for batch in reader:
 yield [self.arrow_to_pandas(c) for c in  
pa.Table.from_batches([batch]).itercolumns()]{code}
 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22898) collect_set aggregation on bucketed table causes an exchange stage

2018-01-03 Thread Modi Tamam (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16309407#comment-16309407
 ] 

Modi Tamam commented on SPARK-22898:


Sure, no problem. I did double check it on 2.2.1, and it looks just fine.

> collect_set aggregation on bucketed table causes an exchange stage
> --
>
> Key: SPARK-22898
> URL: https://issues.apache.org/jira/browse/SPARK-22898
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Modi Tamam
>  Labels: bucketing
>
> I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed 
> table, here's the desc formatted my_bucketed_tbl output:
> +++---+
> |col_nam| data_type|comment|
> +++---+
> |  bundle| string|   null|
> | ifa| string|   null|
> |   date_|date|   null|
> |hour| int|   null|
> |||   |
> |# Detailed Table ...||   |
> |Database| default|   |
> |   Table| my_bucketed_tbl|
> |   Owner|zeppelin|   |
> | Created|Thu Dec 21 13:43:...|   |
> | Last Access|Thu Jan 01 00:00:...|   |
> |Type|EXTERNAL|   |
> |Provider| orc|   |
> | Num Buckets|  16|   |
> |  Bucket Columns| [`ifa`]|   |
> |Sort Columns| [`ifa`]|   |
> |Table Properties|[transient_lastDd...|   |
> |Location|hdfs:/user/hive/w...|   |
> |   Serde Library|org.apache.hadoop...|   |
> | InputFormat|org.apache.hadoop...|   |
> |OutputFormat|org.apache.hadoop...|   |
> |  Storage Properties|[serialization.fo...|   |
> +++---+
> When I'm executing an explain of a group by query, I can see that we've 
> spared the exchange phase :
> {code:java}
> sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain
> == Physical Plan ==
> SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
> +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
>+- *Sort [ifa#932 ASC NULLS FIRST], false, 0
>   +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> {code}
> But, when I replace Spark's max function with collect_set, I can see that the 
> execution plan is the same as a non-bucketed table, means, the exchange phase 
> is not spared :
> {code:java}
> sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by 
> ifa").explain
> == Physical Plan ==
> ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 
> 0)])
> +- Exchange hashpartitioning(ifa#1010, 200)
>+- ObjectHashAggregate(keys=[ifa#1010], 
> functions=[partial_collect_set(bundle#998, 0, 0)])
>   +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22898) collect_set aggregation on bucketed table causes an exchange stage

2017-12-24 Thread Modi Tamam (JIRA)
Modi Tamam created SPARK-22898:
--

 Summary: collect_set aggregation on bucketed table causes an 
exchange stage
 Key: SPARK-22898
 URL: https://issues.apache.org/jira/browse/SPARK-22898
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: Modi Tamam


I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed 
table, here's the desc formatted my_bucketed_tbl output:


+++---+
|col_nam| data_type|comment|
+++---+
|  bundle| string|   null|
| ifa| string|   null|
|   date_|date|   null|
|hour| int|   null|
|||   |
|# Detailed Table ...||   |
|Database| default|   |
|   Table| my_bucketed_tbl|
|   Owner|zeppelin|   |
| Created|Thu Dec 21 13:43:...|   |
| Last Access|Thu Jan 01 00:00:...|   |
|Type|EXTERNAL|   |
|Provider| orc|   |
| Num Buckets|  16|   |
|  Bucket Columns| [`ifa`]|   |
|Sort Columns| [`ifa`]|   |
|Table Properties|[transient_lastDd...|   |
|Location|hdfs:/user/hive/w...|   |
|   Serde Library|org.apache.hadoop...|   |
| InputFormat|org.apache.hadoop...|   |
|OutputFormat|org.apache.hadoop...|   |
|  Storage Properties|[serialization.fo...|   |
+++---+

When I'm executing an explain of a group by query, I can see that we've spared 
the exchange phase :


{code:java}
sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain

== Physical Plan ==
SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
+- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
   +- *Sort [ifa#932 ASC NULLS FIRST], false, 0
  +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, 
Format: ORC, Location: 
InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct
{code}

But, when I replace Spark's max function with collect_set, I can see that the 
execution plan is the same as a non-bucketed table, means, the exchange phase 
is not spared :


{code:java}
sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by ifa").explain

== Physical Plan ==
ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 0)])
+- Exchange hashpartitioning(ifa#1010, 200)
   +- ObjectHashAggregate(keys=[ifa#1010], 
functions=[partial_collect_set(bundle#998, 0, 0)])
  +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, 
Format: ORC, Location: 
InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct

[jira] [Created] (SPARK-21019) read orc when some of the columns are missing in some files

2017-06-08 Thread Modi Tamam (JIRA)
Modi Tamam created SPARK-21019:
--

 Summary:  read orc when some of the columns are missing in some 
files
 Key: SPARK-21019
 URL: https://issues.apache.org/jira/browse/SPARK-21019
 Project: Spark
  Issue Type: Bug
  Components: Input/Output
Affects Versions: 2.1.1
Reporter: Modi Tamam


I'm using Spark-2.1.1.

I'm experiencing an issue when I'm reading a bunch of ORC files when some of 
the fields are missing from some of the files (file-1 has fields 'a' and 'b', 
file-2 has fields 'a' and 'c'). When I'm running the same flow with JSON files 
format, every thing is just fine (you can see it at the code snippets , if 
you'll run it...) My question is whether it's a bug or an expected behavior?

I'v pushed a maven project, ready for run, you can find it here 
https://github.com/MordechaiTamam/spark-orc-issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org