RE: PyArrow Exception in Pandas UDF GROUPEDAGG()

2020-05-07 Thread Gautham Acharya
Thanks for the quick reply, Zhang.

I don't think that we have too much data skew, and if we do, there isn't much 
of a way around it - we need to groupby this specific column in order to run 
aggregates. 

I'm running this with PySpark, it doesn't look like the groupBy() function 
takes a numPartitions column. What other options can I explore?

--gautham

-Original Message-
From: ZHANG Wei  
Sent: Thursday, May 7, 2020 1:34 AM
To: Gautham Acharya 
Cc: user@spark.apache.org
Subject: Re: PyArrow Exception in Pandas UDF GROUPEDAGG()

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.


AFAICT, there might be data skews, some partitions got too much rows, which 
caused out of memory limitation. Trying .groupBy().count() or 
.aggregateByKey().count() may help check each partition data size.
If no data skew, to increase .groupBy() parameter `numPartitions` is worth a 
try.

--
Cheers,
-z

On Wed, 6 May 2020 00:07:58 +
Gautham Acharya  wrote:

> Hi everyone,
>
> I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.
>
> The GROUP BY function runs on a wide dataset. The first column of the dataset 
> contains string labels that are GROUPed on. The remaining columns are numeric 
> values that are aggregated in the Pandas UDF. The dataset is very wide, with 
> 50,000 columns and 3 million rows.
>
> --
> | label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_5|
> |   label_a  | 2.0| 5.6   |  7.123  |
> |   label_b  | 11.0  | 1.4   |  2.345  |
> |   label_a  | 3.1| 6.2   |  5.444  |
>
>
>
> My job runs fine on smaller datasets, with the same number of columns but 
> fewer rows. However, when run on a dataset with 3 million rows, I see the 
> following exception:
>
> 20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 
> (TID 2358)
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 377, in main
> process()
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 372, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 286, in dump_stream
> for series in iterator:
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 303, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 266, in __iter__
>   File "pyarrow/ipc.pxi", line 282, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
>
> Looking at this 
> issue<https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FARROW-4890data=02%7C01%7C%7Caca3586676f846bc5a3308d7f2617a31%7C32669cd6737f4b398bddd6951120d3fc%7C0%7C0%7C637244372788272964sdata=21PIT2sq8Kzmi3ct%2FY6e%2BahHhDZkru%2BPnnkTRMpm%2Ffg%3Dreserved=0>,
>  it looks like PyArrow has a 2GB limit for each shard that is sent to the 
> grouping function.
>
> I'm currently running this job on 4 nodes with 16cores and 64GB of memory 
> each.
>
> I've attached the full error log here as well. What are some workarounds that 
> I can do to get this job running? Unfortunately, we are running up to a 
> production release and this is becoming a severe blocker.
>
> Thanks,
> Gautham
>
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: PyArrow Exception in Pandas UDF GROUPEDAGG()

2020-05-07 Thread ZHANG Wei
AFAICT, there might be data skews, some partitions got too much rows,
which caused out of memory limitation. Trying .groupBy().count()
or .aggregateByKey().count() may help check each partition data size.
If no data skew, to increase .groupBy() parameter `numPartitions` is
worth a try.

-- 
Cheers,
-z

On Wed, 6 May 2020 00:07:58 +
Gautham Acharya  wrote:

> Hi everyone,
> 
> I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.
> 
> The GROUP BY function runs on a wide dataset. The first column of the dataset 
> contains string labels that are GROUPed on. The remaining columns are numeric 
> values that are aggregated in the Pandas UDF. The dataset is very wide, with 
> 50,000 columns and 3 million rows.
> 
> --
> | label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_5|
> |   label_a  | 2.0| 5.6   |  7.123  |
> |   label_b  | 11.0  | 1.4   |  2.345  |
> |   label_a  | 3.1| 6.2   |  5.444  |
> 
> 
> 
> My job runs fine on smaller datasets, with the same number of columns but 
> fewer rows. However, when run on a dataset with 3 million rows, I see the 
> following exception:
> 
> 20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 (TID 
> 2358)
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 377, in main
> process()
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 372, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 286, in dump_stream
> for series in iterator:
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 303, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 266, in __iter__
>   File "pyarrow/ipc.pxi", line 282, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
> 
> Looking at this issue, it 
> looks like PyArrow has a 2GB limit for each shard that is sent to the 
> grouping function.
> 
> I'm currently running this job on 4 nodes with 16cores and 64GB of memory 
> each.
> 
> I've attached the full error log here as well. What are some workarounds that 
> I can do to get this job running? Unfortunately, we are running up to a 
> production release and this is becoming a severe blocker.
> 
> Thanks,
> Gautham
> 
> 
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



PyArrow Exception in Pandas UDF GROUPEDAGG()

2020-05-05 Thread Gautham Acharya
Hi everyone,

I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.

The GROUP BY function runs on a wide dataset. The first column of the dataset 
contains string labels that are GROUPed on. The remaining columns are numeric 
values that are aggregated in the Pandas UDF. The dataset is very wide, with 
50,000 columns and 3 million rows.

--
| label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_5|
|   label_a  | 2.0| 5.6   |  7.123  |
|   label_b  | 11.0  | 1.4   |  2.345  |
|   label_a  | 3.1| 6.2   |  5.444  |



My job runs fine on smaller datasets, with the same number of columns but fewer 
rows. However, when run on a dataset with 3 million rows, I see the following 
exception:

20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 (TID 
2358)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
 line 377, in main
process()
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
 line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
 line 286, in dump_stream
for series in iterator:
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
 line 303, in load_stream
for batch in reader:
  File "pyarrow/ipc.pxi", line 266, in __iter__
  File "pyarrow/ipc.pxi", line 282, in 
pyarrow.lib._CRecordBatchReader.read_next_batch
  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: read length must be positive or -1

Looking at this issue, it 
looks like PyArrow has a 2GB limit for each shard that is sent to the grouping 
function.

I'm currently running this job on 4 nodes with 16cores and 64GB of memory each.

I've attached the full error log here as well. What are some workarounds that I 
can do to get this job running? Unfortunately, we are running up to a 
production release and this is becoming a severe blocker.

Thanks,
Gautham