[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2020-02-16 Thread Liya Fan (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17038112#comment-17038112
 ] 

Liya Fan commented on ARROW-4890:
-

Sure. [~emkornfi...@gmail.com] is right.
After [~emkornfi...@gmail.com] has finished the implementation of the 64 bit 
buffer, we have a few follow-up work items to do, before we can claim that the 
2GB restrict is removed:

1. In ARROW-7610, we apply the 64 bit buffer to vector implementations, and add 
integration tests. This work item is on-going.
2. In ARROW-6111, we provide new vectors to support 64 bit buffers, as the 
current ones have a offset width of 4 bytes. This work item is on-going. (we 
would appreciate if anyone could provide some feedback/review comments for the 
PRs for 1 and 2)
3. And we need another work item to make everything goes well in IPC scenarios. 
This is tracked by ARROW-7746, and has not started yet. (we would appreciate if 
anyone would provide a solution to this issue. Otherwise, I will try to provide 
a solution some days later)



> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2020-02-16 Thread Micah Kornfield (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17038080#comment-17038080
 ] 

Micah Kornfield commented on ARROW-4890:


There have been a few PRs checked in, but the full end-to-end IPC path has not 
been tested yet.  CC [~fan_li_ya]

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2020-02-16 Thread Raz Cohen (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17037859#comment-17037859
 ] 

Raz Cohen commented on ARROW-4890:
--

Hey guys, is there any news regarding the 2GB issue?

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-21 Thread SURESH CHAGANTI (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16979444#comment-16979444
 ] 

SURESH CHAGANTI commented on ARROW-4890:


thank you. I will keep you guys posted on my testing. 

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-20 Thread Micah Kornfield (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978968#comment-16978968
 ] 

Micah Kornfield commented on ARROW-4890:


Yes that is the one.  It hasn't been tested too much yet, but if you have time 
to check to see if it works for you that would be great.

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-20 Thread SURESH CHAGANTI (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978652#comment-16978652
 ] 

SURESH CHAGANTI commented on ARROW-4890:


I guess this branch address the 2 GB issue 
[https://github.com/emkornfield/arrow/tree/int64_address] could you please 
confirm?  thank you.

  

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-20 Thread SURESH CHAGANTI (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16978616#comment-16978616
 ] 

SURESH CHAGANTI commented on ARROW-4890:


sure, I will create an issue against spark, thank you!.  for now to get rid of 
the issue, I will build the code with 
[https://github.com/apache/arrow/pull/5020]  and see what happens.  thank you 
[~emkornfi...@gmail.com] & [~bryanc]

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-18 Thread Micah Kornfield (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16977167#comment-16977167
 ] 

Micah Kornfield commented on ARROW-4890:


I agree, it should probably be on the spark size (assuming the root cause is 
hitting caps in Arrow).

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-18 Thread Bryan Cutler (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16976844#comment-16976844
 ] 

Bryan Cutler commented on ARROW-4890:
-

Sorry, I'm not sure of any documentation with the limits. It would be great to 
get that down somewhere and there should be a better error message for this, 
but maybe it should be done on the Spark side.

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-14 Thread SURESH CHAGANTI (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16974270#comment-16974270
 ] 

SURESH CHAGANTI commented on ARROW-4890:


thank you [~emkornfi...@gmail.com] appreciate your time, I have ran multiple 
tests with different sizes of the data and shard with more than 2GB size was 
failed.  glad to see the fix is in progress and it will be great if this change 
rolls out sooner. I will be happy to contribute to this issue.

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-13 Thread Micah Kornfield (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973960#comment-16973960
 ] 

Micah Kornfield commented on ARROW-4890:


"I am assuming the data that gets sent to pandas_udf method is in the 
uncompressed format", yes I believe this to be true but this isn't really the 
main limitation.  

 

Currently, ArrowBufs (the components that hold memory) are limited to be less 
than 2GB each and only.  I need to cleanup 
[https://github.com/apache/arrow/pull/5020] to address this (and other 
limitations).  I actually might have mis-stated the actual limitations per 
shard for toPandas functions. 

[~cutlerb] do you know what the actual limits are? I can't seem to find any 
documentation on it.

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-13 Thread Abdeali Kothari (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973456#comment-16973456
 ] 

Abdeali Kothari commented on ARROW-4890:


We changed our pipeline to do it with joins and explodes and used spark 
functions.

Was a while back, dont remember the exact specifics of that pipeline.

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-13 Thread SURESH CHAGANTI (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973270#comment-16973270
 ] 

SURESH CHAGANTI commented on ARROW-4890:


Thank you [~AbdealiJK].  can I know another approach you are using? 

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-12 Thread Abdeali Kothari (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16973076#comment-16973076
 ] 

Abdeali Kothari commented on ARROW-4890:


No, I had to stop using pandas UDFs due to this and find another approach for 
the transformations I need to do. 

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-12 Thread SURESH CHAGANTI (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16972882#comment-16972882
 ] 

SURESH CHAGANTI commented on ARROW-4890:


[~AbdealiJK] do you find any fix or workaround for this error?

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-12 Thread SURESH CHAGANTI (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16972881#comment-16972881
 ] 

SURESH CHAGANTI commented on ARROW-4890:


got it, thank you [~emkornfi...@gmail.com], is there any way we can increase 
that size?

I am assuming the data that gets sent to pandas_udf method is in the 
uncompressed format 

 

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-07 Thread Micah Kornfield (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969714#comment-16969714
 ] 

Micah Kornfield commented on ARROW-4890:


Yes.  I believe it is 2GB per shard currently.

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1

2019-11-04 Thread SURESH CHAGANTI (Jira)


[ 
https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16966994#comment-16966994
 ] 

SURESH CHAGANTI commented on ARROW-4890:


[~emkornfi...@gmail.com] is there any size limit as to how much we can send to 
pandas_udf ? I am also seeing the same error as above and my groups are pretty 
large around 200M records and size is around 2 to 4 GB 

> [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
> -
>
> Key: ARROW-4890
> URL: https://issues.apache.org/jira/browse/ARROW-4890
> Project: Apache Arrow
>  Issue Type: Bug
>  Components: Python
>Affects Versions: 0.8.0
> Environment: Cloudera cdh5.13.3
> Cloudera Spark 2.3.0.cloudera3
>Reporter: Abdeali Kothari
>Priority: Major
> Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png
>
>
> Creating this in Arrow project as the traceback seems to suggest this is an 
> issue in Arrow.
>  Continuation from the conversation on the 
> https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E
> When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error:
> {noformat}
>   File 
> "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py",
>  line 279, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 265, in __iter__
>   File "pyarrow/ipc.pxi", line 281, in 
> pyarrow.lib._RecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> {noformat}
> as my dataset size starts increasing that I want to group on. Here is a 
> reproducible code snippet where I can reproduce this.
>  Note: My actual dataset is much larger and has many more unique IDs and is a 
> valid usecase where I cannot simplify this groupby in any way. I have 
> stripped out all the logic to make this example as simple as I could.
> {code:java}
> import os
> os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell'
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql import functions as F, types as T
> import pandas as pd
> spark = pyspark.sql.SparkSession.builder.getOrCreate()
> pdf1 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]],
>   columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4']
> )
> df1 = spark.createDataFrame(pd.concat([pdf1 for i in 
> range(429)]).reset_index()).drop('index')
> pdf2 = pd.DataFrame(
>   [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", 
> "abcdefghijklmno"]],
>   columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6']
> )
> df2 = spark.createDataFrame(pd.concat([pdf2 for i in 
> range(48993)]).reset_index()).drop('index')
> df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner')
> def myudf(df):
> return df
> df4 = df3
> udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf)
> df5 = df4.groupBy('df1_c1').apply(udf)
> print('df5.count()', df5.count())
> # df5.write.parquet('/tmp/temp.parquet', mode='overwrite')
> {code}
> I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per 
> executor too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)