[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038112#comment-17038112 ] Liya Fan commented on ARROW-4890: - Sure. [~emkornfi...@gmail.com] is right. After [~emkornfi...@gmail.com] has finished the implementation of the 64 bit buffer, we have a few follow-up work items to do, before we can claim that the 2GB restrict is removed: 1. In ARROW-7610, we apply the 64 bit buffer to vector implementations, and add integration tests. This work item is on-going. 2. In ARROW-6111, we provide new vectors to support 64 bit buffers, as the current ones have a offset width of 4 bytes. This work item is on-going. (we would appreciate if anyone could provide some feedback/review comments for the PRs for 1 and 2) 3. And we need another work item to make everything goes well in IPC scenarios. This is tracked by ARROW-7746, and has not started yet. (we would appreciate if anyone would provide a solution to this issue. Otherwise, I will try to provide a solution some days later) > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17038080#comment-17038080 ] Micah Kornfield commented on ARROW-4890: There have been a few PRs checked in, but the full end-to-end IPC path has not been tested yet. CC [~fan_li_ya] > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17037859#comment-17037859 ] Raz Cohen commented on ARROW-4890: -- Hey guys, is there any news regarding the 2GB issue? > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16979444#comment-16979444 ] SURESH CHAGANTI commented on ARROW-4890: thank you. I will keep you guys posted on my testing. > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16978968#comment-16978968 ] Micah Kornfield commented on ARROW-4890: Yes that is the one. It hasn't been tested too much yet, but if you have time to check to see if it works for you that would be great. > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16978652#comment-16978652 ] SURESH CHAGANTI commented on ARROW-4890: I guess this branch address the 2 GB issue [https://github.com/emkornfield/arrow/tree/int64_address] could you please confirm? thank you. > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16978616#comment-16978616 ] SURESH CHAGANTI commented on ARROW-4890: sure, I will create an issue against spark, thank you!. for now to get rid of the issue, I will build the code with [https://github.com/apache/arrow/pull/5020] and see what happens. thank you [~emkornfi...@gmail.com] & [~bryanc] > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16977167#comment-16977167 ] Micah Kornfield commented on ARROW-4890: I agree, it should probably be on the spark size (assuming the root cause is hitting caps in Arrow). > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16976844#comment-16976844 ] Bryan Cutler commented on ARROW-4890: - Sorry, I'm not sure of any documentation with the limits. It would be great to get that down somewhere and there should be a better error message for this, but maybe it should be done on the Spark side. > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16974270#comment-16974270 ] SURESH CHAGANTI commented on ARROW-4890: thank you [~emkornfi...@gmail.com] appreciate your time, I have ran multiple tests with different sizes of the data and shard with more than 2GB size was failed. glad to see the fix is in progress and it will be great if this change rolls out sooner. I will be happy to contribute to this issue. > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16973960#comment-16973960 ] Micah Kornfield commented on ARROW-4890: "I am assuming the data that gets sent to pandas_udf method is in the uncompressed format", yes I believe this to be true but this isn't really the main limitation. Currently, ArrowBufs (the components that hold memory) are limited to be less than 2GB each and only. I need to cleanup [https://github.com/apache/arrow/pull/5020] to address this (and other limitations). I actually might have mis-stated the actual limitations per shard for toPandas functions. [~cutlerb] do you know what the actual limits are? I can't seem to find any documentation on it. > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16973456#comment-16973456 ] Abdeali Kothari commented on ARROW-4890: We changed our pipeline to do it with joins and explodes and used spark functions. Was a while back, dont remember the exact specifics of that pipeline. > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16973270#comment-16973270 ] SURESH CHAGANTI commented on ARROW-4890: Thank you [~AbdealiJK]. can I know another approach you are using? > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16973076#comment-16973076 ] Abdeali Kothari commented on ARROW-4890: No, I had to stop using pandas UDFs due to this and find another approach for the transformations I need to do. > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972882#comment-16972882 ] SURESH CHAGANTI commented on ARROW-4890: [~AbdealiJK] do you find any fix or workaround for this error? > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16972881#comment-16972881 ] SURESH CHAGANTI commented on ARROW-4890: got it, thank you [~emkornfi...@gmail.com], is there any way we can increase that size? I am assuming the data that gets sent to pandas_udf method is in the uncompressed format > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969714#comment-16969714 ] Micah Kornfield commented on ARROW-4890: Yes. I believe it is 2GB per shard currently. > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ARROW-4890) [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1
[ https://issues.apache.org/jira/browse/ARROW-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16966994#comment-16966994 ] SURESH CHAGANTI commented on ARROW-4890: [~emkornfi...@gmail.com] is there any size limit as to how much we can send to pandas_udf ? I am also seeing the same error as above and my groups are pretty large around 200M records and size is around 2 to 4 GB > [Python] Spark+Arrow Grouped pandas UDAF - read length must be positive or -1 > - > > Key: ARROW-4890 > URL: https://issues.apache.org/jira/browse/ARROW-4890 > Project: Apache Arrow > Issue Type: Bug > Components: Python >Affects Versions: 0.8.0 > Environment: Cloudera cdh5.13.3 > Cloudera Spark 2.3.0.cloudera3 >Reporter: Abdeali Kothari >Priority: Major > Attachments: Task retry fails.png, image-2019-07-04-12-03-57-002.png > > > Creating this in Arrow project as the traceback seems to suggest this is an > issue in Arrow. > Continuation from the conversation on the > https://mail-archives.apache.org/mod_mbox/arrow-dev/201903.mbox/%3CCAK7Z5T_mChuqhFDAF2U68dO=p_1nst5ajjcrg0mexo5kby9...@mail.gmail.com%3E > When I run a GROUPED_MAP UDF in Spark using PySpark, I run into the error: > {noformat} > File > "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera3-1.cdh5.13.3.p0.458809/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", > line 279, in load_stream > for batch in reader: > File "pyarrow/ipc.pxi", line 265, in __iter__ > File "pyarrow/ipc.pxi", line 281, in > pyarrow.lib._RecordBatchReader.read_next_batch > File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > pyarrow.lib.ArrowIOError: read length must be positive or -1 > {noformat} > as my dataset size starts increasing that I want to group on. Here is a > reproducible code snippet where I can reproduce this. > Note: My actual dataset is much larger and has many more unique IDs and is a > valid usecase where I cannot simplify this groupby in any way. I have > stripped out all the logic to make this example as simple as I could. > {code:java} > import os > os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 9G pyspark-shell' > import findspark > findspark.init() > import pyspark > from pyspark.sql import functions as F, types as T > import pandas as pd > spark = pyspark.sql.SparkSession.builder.getOrCreate() > pdf1 = pd.DataFrame( > [[1234567, 0.0, "abcdefghij", "2000-01-01T00:00:00.000Z"]], > columns=['df1_c1', 'df1_c2', 'df1_c3', 'df1_c4'] > ) > df1 = spark.createDataFrame(pd.concat([pdf1 for i in > range(429)]).reset_index()).drop('index') > pdf2 = pd.DataFrame( > [[1234567, 0.0, "abcdefghijklmno", "2000-01-01", "abcdefghijklmno", > "abcdefghijklmno"]], > columns=['df2_c1', 'df2_c2', 'df2_c3', 'df2_c4', 'df2_c5', 'df2_c6'] > ) > df2 = spark.createDataFrame(pd.concat([pdf2 for i in > range(48993)]).reset_index()).drop('index') > df3 = df1.join(df2, df1['df1_c1'] == df2['df2_c1'], how='inner') > def myudf(df): > return df > df4 = df3 > udf = F.pandas_udf(df4.schema, F.PandasUDFType.GROUPED_MAP)(myudf) > df5 = df4.groupBy('df1_c1').apply(udf) > print('df5.count()', df5.count()) > # df5.write.parquet('/tmp/temp.parquet', mode='overwrite') > {code} > I have tried running this on Amazon EMR with Spark 2.3.1 and 20GB RAM per > executor too. -- This message was sent by Atlassian Jira (v8.3.4#803005)