[jira] [Commented] (SPARK-28269) ArrowStreamPandasSerializer get stack
[ https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16884330#comment-16884330 ] Modi Tamam commented on SPARK-28269: [~hyukjin.kwon] I think that your diagnose is wrong and you haven't reached the problematic action. The problem is on this row: {code:java} full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show() {code} And it seems like you haven't reached it. > ArrowStreamPandasSerializer get stack > - > > Key: SPARK-28269 > URL: https://issues.apache.org/jira/browse/SPARK-28269 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Modi Tamam >Priority: Major > Attachments: Untitled.xcf > > > I'm working with Pyspark version 2.4.3. > I have a big data frame: > * ~15M rows > * ~130 columns > * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it > (pandas_df.toPickle() ) resulted with a file of size 2.5GB. > I have some code that groups this data frame and applying a Pandas-UDF: > > {code:java} > from pyspark.sql import Row > from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json > from pyspark.sql.types import * > from pyspark.sql import functions as F > initial_list = range(4500) > rdd = sc.parallelize(initial_list) > rdd = rdd.map(lambda x: Row(val=x)) > initial_spark_df = spark.createDataFrame(rdd) > cols_count = 132 > rows = 1000 > # --- Start Generating the big data frame--- > # Generating the schema > schema = StructType([StructField(str(i), IntegerType()) for i in > range(cols_count)]) > @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP) > def random_pd_df_generator(df): > import numpy as np > import pandas as pd > return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), > columns=range(cols_count)) > full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator) > # --- End Generating the big data frame--- > # ---Start the bug reproduction--- > grouped_col = "col_0" > @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP) > def very_simpl_udf(pdf): > import pandas as pd > ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]}) > return ret_val > # In order to create a huge dataset, I've set all of the grouped_col value to > a single value, then, grouped it into a single dataset. > # Here is where to program gets stuck > full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show() > assert False, "If we're, means that the issue wasn't reproduced" > {code} > > The above code gets stuck on the ArrowStreamPandasSerializer: (on the first > line when reading batch from the reader) > > {code:java} > for batch in reader: > yield [self.arrow_to_pandas(c) for c in > pa.Table.from_batches([batch]).itercolumns()]{code} > > You can just run the first code snippet and it will reproduce. > Open a Pyspark shell with this configuration: > {code:java} > pyspark --conf "spark.python.worker.memory=3G" --conf > "spark.executor.memory=20G" --conf > "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf > "spark.driver.memory=10G"{code} > > Versions: > * pandas - 0.24.2 > * pyarrow - 0.13.0 > * Spark - 2.4.2 > * Python - 2.7.16 -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28269) ArrowStreamPandasSerializer get stack
[ https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882846#comment-16882846 ] Modi Tamam commented on SPARK-28269: [~hyukjin.kwon] Sorry, but it doesn't work on my machie, can you please double check? maybe packages versions? > ArrowStreamPandasSerializer get stack > - > > Key: SPARK-28269 > URL: https://issues.apache.org/jira/browse/SPARK-28269 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Modi Tamam >Priority: Major > Attachments: Untitled.xcf > > > I'm working with Pyspark version 2.4.3. > I have a big data frame: > * ~15M rows > * ~130 columns > * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it > (pandas_df.toPickle() ) resulted with a file of size 2.5GB. > I have some code that groups this data frame and applying a Pandas-UDF: > > {code:java} > from pyspark.sql import Row > from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json > from pyspark.sql.types import * > from pyspark.sql import functions as F > initial_list = range(4500) > rdd = sc.parallelize(initial_list) > rdd = rdd.map(lambda x: Row(val=x)) > initial_spark_df = spark.createDataFrame(rdd) > cols_count = 132 > rows = 1000 > # --- Start Generating the big data frame--- > # Generating the schema > schema = StructType([StructField(str(i), IntegerType()) for i in > range(cols_count)]) > @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP) > def random_pd_df_generator(df): > import numpy as np > import pandas as pd > return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), > columns=range(cols_count)) > full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator) > # --- End Generating the big data frame--- > # ---Start the bug reproduction--- > grouped_col = "col_0" > @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP) > def very_simpl_udf(pdf): > import pandas as pd > ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]}) > return ret_val > # In order to create a huge dataset, I've set all of the grouped_col value to > a single value, then, grouped it into a single dataset. > # Here is where to program gets stuck > full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show() > assert False, "If we're, means that the issue wasn't reproduced" > {code} > > The above code gets stuck on the ArrowStreamPandasSerializer: (on the first > line when reading batch from the reader) > > {code:java} > for batch in reader: > yield [self.arrow_to_pandas(c) for c in > pa.Table.from_batches([batch]).itercolumns()]{code} > > You can just run the first code snippet and it will reproduce. > Open a Pyspark shell with this configuration: > {code:java} > pyspark --conf "spark.python.worker.memory=3G" --conf > "spark.executor.memory=20G" --conf > "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf > "spark.driver.memory=10G"{code} > > Versions: > * pandas - 0.24.2 > * pyarrow - 0.13.0 > * Spark - 2.4.2 > * Python - 2.7.16 -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28269) ArrowStreamPandasSerializer get stack
[ https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880942#comment-16880942 ] Modi Tamam commented on SPARK-28269: Sorry, I had a typo, not stack but *stuck*. There shouldn't be any exception, the program should never end and never get to the assert at the last line. For reproduction, I used r4.2xlarge machine on AWS (8 cores, ~60 G RAM). When I'm attaching a debugger to the pyspark.daemon, I can see that the process is stuck inside the pyspark/serializers.py in the load_stream method - line 303: {code:java} for batch in reader:{code} > ArrowStreamPandasSerializer get stack > - > > Key: SPARK-28269 > URL: https://issues.apache.org/jira/browse/SPARK-28269 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Modi Tamam >Priority: Major > Attachments: Untitled.xcf > > > I'm working with Pyspark version 2.4.3. > I have a big data frame: > * ~15M rows > * ~130 columns > * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it > (pandas_df.toPickle() ) resulted with a file of size 2.5GB. > I have some code that groups this data frame and applying a Pandas-UDF: > > {code:java} > from pyspark.sql import Row > from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json > from pyspark.sql.types import * > from pyspark.sql import functions as F > initial_list = range(4500) > rdd = sc.parallelize(initial_list) > rdd = rdd.map(lambda x: Row(val=x)) > initial_spark_df = spark.createDataFrame(rdd) > cols_count = 132 > rows = 1000 > # --- Start Generating the big data frame--- > # Generating the schema > schema = StructType([StructField(str(i), IntegerType()) for i in > range(cols_count)]) > @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP) > def random_pd_df_generator(df): > import numpy as np > import pandas as pd > return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), > columns=range(cols_count)) > full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator) > # --- End Generating the big data frame--- > # ---Start the bug reproduction--- > grouped_col = "col_0" > @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP) > def very_simpl_udf(pdf): > import pandas as pd > ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]}) > return ret_val > # In order to create a huge dataset, I've set all of the grouped_col value to > a single value, then, grouped it into a single dataset. > # Here is where to program gets stuck > full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show() > assert False, "If we're, means that the issue wasn't reproduced" > {code} > > The above code gets stacked on the ArrowStreamPandasSerializer: (on the first > line when reading batch from the reader) > > {code:java} > for batch in reader: > yield [self.arrow_to_pandas(c) for c in > pa.Table.from_batches([batch]).itercolumns()]{code} > > You can just run the first code snippet and it will reproduce. > Open a Pyspark shell with this configuration: > {code:java} > pyspark --conf "spark.python.worker.memory=3G" --conf > "spark.executor.memory=20G" --conf > "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf > "spark.driver.memory=10G"{code} > > Versions: > * pandas - 0.24.2 > * pyarrow - 0.13.0 > * Spark - 2.4.2 > * Python - 2.7.16 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28269) ArrowStreamPandasSerializer get stack
[ https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16880216#comment-16880216 ] Modi Tamam commented on SPARK-28269: [~hyukjin.kwon] I've updated the message with simple code so you can reproduce easily. In addition, there is no stacktrace since the process just stucks on the serialization. > ArrowStreamPandasSerializer get stack > - > > Key: SPARK-28269 > URL: https://issues.apache.org/jira/browse/SPARK-28269 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Modi Tamam >Priority: Major > Attachments: Untitled.xcf > > > I'm working with Pyspark version 2.4.3. > I have a big data frame: > * ~15M rows > * ~130 columns > * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it > (pandas_df.toPickle() ) resulted with a file of size 2.5GB. > I have some code that groups this data frame and applying a Pandas-UDF: > > {code:java} > from pyspark.sql import Row > from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json > from pyspark.sql.types import * > from pyspark.sql import functions as F > initial_list = range(4500) > rdd = sc.parallelize(initial_list) > rdd = rdd.map(lambda x: Row(val=x)) > initial_spark_df = spark.createDataFrame(rdd) > cols_count = 132 > rows = 1000 > # --- Start Generating the big data frame--- > # Generating the schema > schema = StructType([StructField(str(i), IntegerType()) for i in > range(cols_count)]) > @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP) > def random_pd_df_generator(df): > import numpy as np > import pandas as pd > return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), > columns=range(cols_count)) > full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator) > # --- End Generating the big data frame--- > # ---Start the bug reproduction--- > grouped_col = "col_0" > @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP) > def very_simpl_udf(pdf): > import pandas as pd > ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]}) > return ret_val > # In order to create a huge dataset, I've set all of the grouped_col value to > a single value, then, grouped it into a single dataset. > # Here is where to program gets stuck > full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show() > assert False, "If we're, means that the issue wasn't reproduced" > {code} > > The above code gets stacked on the ArrowStreamPandasSerializer: (on the first > line when reading batch from the reader) > > {code:java} > for batch in reader: > yield [self.arrow_to_pandas(c) for c in > pa.Table.from_batches([batch]).itercolumns()]{code} > > You can just run the first code snippet and it will reproduce. > Open a Pyspark shell with this configuration: > {code:java} > pyspark --conf "spark.python.worker.memory=3G" --conf > "spark.executor.memory=20G" --conf > "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf > "spark.driver.memory=10G"{code} > > Versions: > * pandas - 0.24.2 > * pyarrow - 0.13.0 > * Spark - 2.4.2 > * Python - 2.7.16 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28269) ArrowStreamPandasSerializer get stack
[ https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Modi Tamam updated SPARK-28269: --- Description: I'm working with Pyspark version 2.4.3. I have a big data frame: * ~15M rows * ~130 columns * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it (pandas_df.toPickle() ) resulted with a file of size 2.5GB. I have some code that groups this data frame and applying a Pandas-UDF: {code:java} from pyspark.sql import Row from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json from pyspark.sql.types import * from pyspark.sql import functions as F initial_list = range(4500) rdd = sc.parallelize(initial_list) rdd = rdd.map(lambda x: Row(val=x)) initial_spark_df = spark.createDataFrame(rdd) cols_count = 132 rows = 1000 # --- Start Generating the big data frame--- # Generating the schema schema = StructType([StructField(str(i), IntegerType()) for i in range(cols_count)]) @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP) def random_pd_df_generator(df): import numpy as np import pandas as pd return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), columns=range(cols_count)) full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator) # --- End Generating the big data frame--- # ---Start the bug reproduction--- grouped_col = "col_0" @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP) def very_simpl_udf(pdf): import pandas as pd ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]}) return ret_val # In order to create a huge dataset, I've set all of the grouped_col value to a single value, then, grouped it into a single dataset. # Here is where to program gets stuck full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show() assert False, "If we're, means that the issue wasn't reproduced" {code} The above code gets stacked on the ArrowStreamPandasSerializer: (on the first line when reading batch from the reader) {code:java} for batch in reader: yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]{code} You can just run the first code snippet and it will reproduce. Open a Pyspark shell with this configuration: {code:java} pyspark --conf "spark.python.worker.memory=3G" --conf "spark.executor.memory=20G" --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf "spark.driver.memory=10G"{code} Versions: * pandas - 0.24.2 * pyarrow - 0.13.0 * Spark - 2.4.2 * Python - 2.7.16 was: I'm working with Pyspark version 2.4.3. I have a big data frame: * ~15M rows * ~130 columns * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it (pandas_df.toPickle() ) resulted with a file of size 2.5GB. I have some code that groups this data frame and applying a Pandas-UDF: {code:java} from pyspark.sql import Row from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json from pyspark.sql.types import * from pyspark.sql import functions as F initial_list = range(4500) rdd = sc.parallelize(initial_list) rdd = rdd.map(lambda x: Row(val=x)) initial_spark_df = spark.createDataFrame(rdd) cols_count = 132 rows = 1000 # --- Start Generating the big data frame--- # Generating the schema schema = StructType([StructField(str(i), IntegerType()) for i in range(cols_count)]) @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP) def random_pd_df_generator(df): import numpy as np import pandas as pd return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), columns=range(cols_count)) full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator) # --- End Generating the big data frame--- # ---Start the bug reproduction--- grouped_col = "col_0" @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP) def very_simpl_udf(pdf): import pandas as pd ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]}) return ret_val # In order to create a huge dataset, I've set all of the grouped_col value to a single value, then, grouped it into a single dataset. # Here is where to program gets stuck full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show() assert False, "If we're, means that the issue wasn't reproduced" {code} The above code gets stacked on the ArrowStreamPandasSerializer: (on the first line when reading batch from the reader) {code:java} for batch in reader: yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]{code} You can just run the first code snipp
[jira] [Updated] (SPARK-28269) ArrowStreamPandasSerializer get stack
[ https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Modi Tamam updated SPARK-28269: --- Description: I'm working with Pyspark version 2.4.3. I have a big data frame: * ~15M rows * ~130 columns * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it (pandas_df.toPickle() ) resulted with a file of size 2.5GB. I have some code that groups this data frame and applying a Pandas-UDF: {code:java} from pyspark.sql import Row from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json from pyspark.sql.types import * from pyspark.sql import functions as F initial_list = range(4500) rdd = sc.parallelize(initial_list) rdd = rdd.map(lambda x: Row(val=x)) initial_spark_df = spark.createDataFrame(rdd) cols_count = 132 rows = 1000 # --- Start Generating the big data frame--- # Generating the schema schema = StructType([StructField(str(i), IntegerType()) for i in range(cols_count)]) @pandas_udf(returnType=schema,functionType=PandasUDFType.GROUPED_MAP) def random_pd_df_generator(df): import numpy as np import pandas as pd return pd.DataFrame(np.random.randint(0, 100, size=(rows, cols_count)), columns=range(cols_count)) full_spark_df = initial_spark_df.groupBy("val").apply(random_pd_df_generator) # --- End Generating the big data frame--- # ---Start the bug reproduction--- grouped_col = "col_0" @pandas_udf("%s string" %grouped_col, PandasUDFType.GROUPED_MAP) def very_simpl_udf(pdf): import pandas as pd ret_val = pd.DataFrame({grouped_col: [str(pdf[grouped_col].iloc[0])]}) return ret_val # In order to create a huge dataset, I've set all of the grouped_col value to a single value, then, grouped it into a single dataset. # Here is where to program gets stuck full_spark_df.withColumn(grouped_col,F.lit('0')).groupBy(grouped_col).apply(very_simpl_udf).show() assert False, "If we're, means that the issue wasn't reproduced" {code} The above code gets stacked on the ArrowStreamPandasSerializer: (on the first line when reading batch from the reader) {code:java} for batch in reader: yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]{code} You can just run the first code snippet and it will reproduce. Open a Pyspark shell with this configuration: {code:java} pyspark --conf "spark.python.worker.memory=3G" --conf "spark.executor.memory=20G" --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf "spark.driver.memory=10G"{code} Versions: * pandas - 0.24.2 * pyarrow - 0.13.0 * Spark - 2.4.2 was: I'm working with Pyspark version 2.4.3. I have a big data frame: * ~15M rows * ~130 columns * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it (pandas_df.toPickle() ) resulted with a file of size 2.5GB. I have some code that groups this data frame and applying a Pandas-UDF: {code:java} from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql import functions as F import pyarrow.parquet as pq import pyarrow as pa non_issued_patch="31.7996378000_35.2114362000" issued_patch = "31.7995787833_35.2121463045" @pandas_udf("patch_name string", PandasUDFType.GROUPED_MAP) def foo(pdf): import pandas as pd ret_val = pd.DataFrame({'patch_name': [pdf['patch_name'].iloc[0]]}) return ret_val full_df=spark.read.parquet('debug-mega-patch') df = full_df.filter(F.col("grouping_column") == issued_patch).cache() df.groupBy("grouping_column").apply(foo).repartition(1).write.mode('overwrite').parquet('debug-df/') {code} The above code gets stacked on the ArrowStreamPandasSerializer: (on the first line when reading batch from the reader) {code:java} for batch in reader: yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]{code} > ArrowStreamPandasSerializer get stack > - > > Key: SPARK-28269 > URL: https://issues.apache.org/jira/browse/SPARK-28269 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Modi Tamam >Priority: Major > Attachments: Untitled.xcf > > > I'm working with Pyspark version 2.4.3. > I have a big data frame: > * ~15M rows > * ~130 columns > * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it > (pandas_df.toPickle() ) resulted with a file of size 2.5GB. > I have some code that groups this data frame and applying a Pandas-UDF: > > {code:java} > from pyspark.sql import Row > from pyspark.sql.functions import lit, pandas_udf, PandasUDFType, to_json > from pyspark.sql.types import * > from pyspark.sql import functions as F > initial_list = range(4500) > rdd = sc.parallelize(initial_list) > rdd = r
[jira] [Updated] (SPARK-28269) ArrowStreamPandasSerializer get stack
[ https://issues.apache.org/jira/browse/SPARK-28269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Modi Tamam updated SPARK-28269: --- Attachment: Untitled.xcf > ArrowStreamPandasSerializer get stack > - > > Key: SPARK-28269 > URL: https://issues.apache.org/jira/browse/SPARK-28269 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Modi Tamam >Priority: Major > Attachments: Untitled.xcf > > > I'm working with Pyspark version 2.4.3. > I have a big data frame: > * ~15M rows > * ~130 columns > * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it > (pandas_df.toPickle() ) resulted with a file of size 2.5GB. > I have some code that groups this data frame and applying a Pandas-UDF: > > {code:java} > from pyspark.sql.functions import pandas_udf, PandasUDFType > from pyspark.sql import functions as F > import pyarrow.parquet as pq > import pyarrow as pa > non_issued_patch="31.7996378000_35.2114362000" > issued_patch = "31.7995787833_35.2121463045" > @pandas_udf("patch_name string", PandasUDFType.GROUPED_MAP) > def foo(pdf): > import pandas as pd > ret_val = pd.DataFrame({'patch_name': [pdf['patch_name'].iloc[0]]}) > return ret_val > full_df=spark.read.parquet('debug-mega-patch') > df = full_df.filter(F.col("grouping_column") == issued_patch).cache() > df.groupBy("grouping_column").apply(foo).repartition(1).write.mode('overwrite').parquet('debug-df/') > > {code} > > The above code gets stacked on the ArrowStreamPandasSerializer: (on the first > line when reading batch from the reader) > > {code:java} > for batch in reader: > yield [self.arrow_to_pandas(c) for c in > pa.Table.from_batches([batch]).itercolumns()]{code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-28269) ArrowStreamPandasSerializer get stack
Modi Tamam created SPARK-28269: -- Summary: ArrowStreamPandasSerializer get stack Key: SPARK-28269 URL: https://issues.apache.org/jira/browse/SPARK-28269 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.4.3 Reporter: Modi Tamam I'm working with Pyspark version 2.4.3. I have a big data frame: * ~15M rows * ~130 columns * ~2.5 GB - I've converted it to a Pandas data frame, then, pickling it (pandas_df.toPickle() ) resulted with a file of size 2.5GB. I have some code that groups this data frame and applying a Pandas-UDF: {code:java} from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql import functions as F import pyarrow.parquet as pq import pyarrow as pa non_issued_patch="31.7996378000_35.2114362000" issued_patch = "31.7995787833_35.2121463045" @pandas_udf("patch_name string", PandasUDFType.GROUPED_MAP) def foo(pdf): import pandas as pd ret_val = pd.DataFrame({'patch_name': [pdf['patch_name'].iloc[0]]}) return ret_val full_df=spark.read.parquet('debug-mega-patch') df = full_df.filter(F.col("grouping_column") == issued_patch).cache() df.groupBy("grouping_column").apply(foo).repartition(1).write.mode('overwrite').parquet('debug-df/') {code} The above code gets stacked on the ArrowStreamPandasSerializer: (on the first line when reading batch from the reader) {code:java} for batch in reader: yield [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()]{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22898) collect_set aggregation on bucketed table causes an exchange stage
[ https://issues.apache.org/jira/browse/SPARK-22898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16309407#comment-16309407 ] Modi Tamam commented on SPARK-22898: Sure, no problem. I did double check it on 2.2.1, and it looks just fine. > collect_set aggregation on bucketed table causes an exchange stage > -- > > Key: SPARK-22898 > URL: https://issues.apache.org/jira/browse/SPARK-22898 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.0 >Reporter: Modi Tamam > Labels: bucketing > > I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed > table, here's the desc formatted my_bucketed_tbl output: > +++---+ > |col_nam| data_type|comment| > +++---+ > | bundle| string| null| > | ifa| string| null| > | date_|date| null| > |hour| int| null| > ||| | > |# Detailed Table ...|| | > |Database| default| | > | Table| my_bucketed_tbl| > | Owner|zeppelin| | > | Created|Thu Dec 21 13:43:...| | > | Last Access|Thu Jan 01 00:00:...| | > |Type|EXTERNAL| | > |Provider| orc| | > | Num Buckets| 16| | > | Bucket Columns| [`ifa`]| | > |Sort Columns| [`ifa`]| | > |Table Properties|[transient_lastDd...| | > |Location|hdfs:/user/hive/w...| | > | Serde Library|org.apache.hadoop...| | > | InputFormat|org.apache.hadoop...| | > |OutputFormat|org.apache.hadoop...| | > | Storage Properties|[serialization.fo...| | > +++---+ > When I'm executing an explain of a group by query, I can see that we've > spared the exchange phase : > {code:java} > sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain > == Physical Plan == > SortAggregate(key=[ifa#932], functions=[max(bundle#920)]) > +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)]) >+- *Sort [ifa#932 ASC NULLS FIRST], false, 0 > +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, > Format: ORC, Location: > InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct > {code} > But, when I replace Spark's max function with collect_set, I can see that the > execution plan is the same as a non-bucketed table, means, the exchange phase > is not spared : > {code:java} > sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by > ifa").explain > == Physical Plan == > ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, > 0)]) > +- Exchange hashpartitioning(ifa#1010, 200) >+- ObjectHashAggregate(keys=[ifa#1010], > functions=[partial_collect_set(bundle#998, 0, 0)]) > +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, > Format: ORC, Location: > InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22898) collect_set aggregation on bucketed table causes an exchange stage
Modi Tamam created SPARK-22898: -- Summary: collect_set aggregation on bucketed table causes an exchange stage Key: SPARK-22898 URL: https://issues.apache.org/jira/browse/SPARK-22898 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.3.0 Reporter: Modi Tamam I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed table, here's the desc formatted my_bucketed_tbl output: +++---+ |col_nam| data_type|comment| +++---+ | bundle| string| null| | ifa| string| null| | date_|date| null| |hour| int| null| ||| | |# Detailed Table ...|| | |Database| default| | | Table| my_bucketed_tbl| | Owner|zeppelin| | | Created|Thu Dec 21 13:43:...| | | Last Access|Thu Jan 01 00:00:...| | |Type|EXTERNAL| | |Provider| orc| | | Num Buckets| 16| | | Bucket Columns| [`ifa`]| | |Sort Columns| [`ifa`]| | |Table Properties|[transient_lastDd...| | |Location|hdfs:/user/hive/w...| | | Serde Library|org.apache.hadoop...| | | InputFormat|org.apache.hadoop...| | |OutputFormat|org.apache.hadoop...| | | Storage Properties|[serialization.fo...| | +++---+ When I'm executing an explain of a group by query, I can see that we've spared the exchange phase : {code:java} sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain == Physical Plan == SortAggregate(key=[ifa#932], functions=[max(bundle#920)]) +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)]) +- *Sort [ifa#932 ASC NULLS FIRST], false, 0 +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct {code} But, when I replace Spark's max function with collect_set, I can see that the execution plan is the same as a non-bucketed table, means, the exchange phase is not spared : {code:java} sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by ifa").explain == Physical Plan == ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 0)]) +- Exchange hashpartitioning(ifa#1010, 200) +- ObjectHashAggregate(keys=[ifa#1010], functions=[partial_collect_set(bundle#998, 0, 0)]) +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct
[jira] [Created] (SPARK-21019) read orc when some of the columns are missing in some files
Modi Tamam created SPARK-21019: -- Summary: read orc when some of the columns are missing in some files Key: SPARK-21019 URL: https://issues.apache.org/jira/browse/SPARK-21019 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 2.1.1 Reporter: Modi Tamam I'm using Spark-2.1.1. I'm experiencing an issue when I'm reading a bunch of ORC files when some of the fields are missing from some of the files (file-1 has fields 'a' and 'b', file-2 has fields 'a' and 'c'). When I'm running the same flow with JSON files format, every thing is just fine (you can see it at the code snippets , if you'll run it...) My question is whether it's a bug or an expected behavior? I'v pushed a maven project, ready for run, you can find it here https://github.com/MordechaiTamam/spark-orc-issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org