RE: PyArrow Exception in Pandas UDF GROUPEDAGG()

2020-05-07 Thread Gautham Acharya
Thanks for the quick reply, Zhang.

I don't think that we have too much data skew, and if we do, there isn't much 
of a way around it - we need to groupby this specific column in order to run 
aggregates. 

I'm running this with PySpark, it doesn't look like the groupBy() function 
takes a numPartitions column. What other options can I explore?

--gautham

-Original Message-
From: ZHANG Wei  
Sent: Thursday, May 7, 2020 1:34 AM
To: Gautham Acharya 
Cc: user@spark.apache.org
Subject: Re: PyArrow Exception in Pandas UDF GROUPEDAGG()

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.


AFAICT, there might be data skews, some partitions got too much rows, which 
caused out of memory limitation. Trying .groupBy().count() or 
.aggregateByKey().count() may help check each partition data size.
If no data skew, to increase .groupBy() parameter `numPartitions` is worth a 
try.

--
Cheers,
-z

On Wed, 6 May 2020 00:07:58 +
Gautham Acharya  wrote:

> Hi everyone,
>
> I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.
>
> The GROUP BY function runs on a wide dataset. The first column of the dataset 
> contains string labels that are GROUPed on. The remaining columns are numeric 
> values that are aggregated in the Pandas UDF. The dataset is very wide, with 
> 50,000 columns and 3 million rows.
>
> --
> | label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_5|
> |   label_a  | 2.0| 5.6   |  7.123  |
> |   label_b  | 11.0  | 1.4   |  2.345  |
> |   label_a  | 3.1| 6.2   |  5.444  |
>
>
>
> My job runs fine on smaller datasets, with the same number of columns but 
> fewer rows. However, when run on a dataset with 3 million rows, I see the 
> following exception:
>
> 20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 
> (TID 2358)
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 377, in main
> process()
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
>  line 372, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 286, in dump_stream
> for series in iterator:
>   File 
> "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
>  line 303, in load_stream
> for batch in reader:
>   File "pyarrow/ipc.pxi", line 266, in __iter__
>   File "pyarrow/ipc.pxi", line 282, in 
> pyarrow.lib._CRecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
>
> Looking at this 
> issue<https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FARROW-4890data=02%7C01%7C%7Caca3586676f846bc5a3308d7f2617a31%7C32669cd6737f4b398bddd6951120d3fc%7C0%7C0%7C637244372788272964sdata=21PIT2sq8Kzmi3ct%2FY6e%2BahHhDZkru%2BPnnkTRMpm%2Ffg%3Dreserved=0>,
>  it looks like PyArrow has a 2GB limit for each shard that is sent to the 
> grouping function.
>
> I'm currently running this job on 4 nodes with 16cores and 64GB of memory 
> each.
>
> I've attached the full error log here as well. What are some workarounds that 
> I can do to get this job running? Unfortunately, we are running up to a 
> production release and this is becoming a severe blocker.
>
> Thanks,
> Gautham
>
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



PyArrow Exception in Pandas UDF GROUPEDAGG()

2020-05-05 Thread Gautham Acharya
Hi everyone,

I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.

The GROUP BY function runs on a wide dataset. The first column of the dataset 
contains string labels that are GROUPed on. The remaining columns are numeric 
values that are aggregated in the Pandas UDF. The dataset is very wide, with 
50,000 columns and 3 million rows.

--
| label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_5|
|   label_a  | 2.0| 5.6   |  7.123  |
|   label_b  | 11.0  | 1.4   |  2.345  |
|   label_a  | 3.1| 6.2   |  5.444  |



My job runs fine on smaller datasets, with the same number of columns but fewer 
rows. However, when run on a dataset with 3 million rows, I see the following 
exception:

20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 (TID 
2358)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
 line 377, in main
process()
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py",
 line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
 line 286, in dump_stream
for series in iterator:
  File 
"/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py",
 line 303, in load_stream
for batch in reader:
  File "pyarrow/ipc.pxi", line 266, in __iter__
  File "pyarrow/ipc.pxi", line 282, in 
pyarrow.lib._CRecordBatchReader.read_next_batch
  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
pyarrow.lib.ArrowIOError: read length must be positive or -1

Looking at this issue, it 
looks like PyArrow has a 2GB limit for each shard that is sent to the grouping 
function.

I'm currently running this job on 4 nodes with 16cores and 64GB of memory each.

I've attached the full error log here as well. What are some workarounds that I 
can do to get this job running? Unfortunately, we are running up to a 
production release and this is becoming a severe blocker.

Thanks,
Gautham






[PySpark] How to write HFiles as an 'append' to the same directory?

2020-03-14 Thread Gautham Acharya
I have a process in Apache Spark that attempts to write HFiles to S3 in a 
batched process. I want the resulting HFiles in the same directory, as they are 
in the same column family. However, I'm getting a 'directory already exists 
error' when I try to run this on AWS EMR. How can I write Hfiles via Spark as 
an 'append', like I can do via a CSV?

The batch writing function looks like this:

for col_group in split_cols:
processed_chunk = 
batch_write_pandas_udf_for_col_aggregation(joined_dataframe, col_group, 
pandas_udf_func, group_by_args)

hfile_writer.write_hfiles(processed_chunk, output_path,
  zookeeper_ip, table_name, 
constants.DEFAULT_COL_FAMILY)

The actual function to write the Hfiles is this:

rdd.saveAsNewAPIHadoopFile(output_path,
   constants.OUTPUT_FORMAT_CLASS,
   keyClass=constants.KEY_CLASS,
   valueClass=constants.VALUE_CLASS,
   keyConverter=constants.KEY_CONVERTER,
   valueConverter=constants.VALUE_CONVERTER,
   conf=conf)

The exception I'm getting:


Called with arguments: Namespace(job_args=['matrix_path=/tmp/matrix.csv', 
'metadata_path=/tmp/metadata.csv', 
'output_path=s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles',
 'group_by_args=cluster_id', 'zookeeper_ip=ip-172-30-5-36.ec2.internal', 
'table_name=test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a'], 
job_name='matrix_transformations')

job_args_tuples: [['matrix_path', '/tmp/matrix.csv'], ['metadata_path', 
'/tmp/metadata.csv'], ['output_path', 
's3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles'],
 ['group_by_args', 'cluster_id'], ['zookeeper_ip', 
'ip-172-30-5-36.ec2.internal'], ['table_name', 
'test_wide_matrix__8c6b2f09-2ba0-42fc-af8e-656188c0186a']]

Traceback (most recent call last):

  File "/mnt/var/lib/hadoop/steps/s-2ZIOR335HH9TR/main.py", line 56, in 

job_module.transform(spark, **job_args)

  File 
"/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py",
 line 93, in transform

  File 
"/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/jobs/matrix_transformations/job.py",
 line 73, in write_split_columnwise_transform

  File 
"/mnt/tmp/spark-745d355c-0a90-4992-a07c-bc1dde4fac3e/userFiles-33d65b1c-bdae-4b44-997e-4d4c7521ad96/dep.zip/src/spark_transforms/pyspark_jobs/output_handler/hfile_writer.py",
 line 44, in write_hfiles

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1438, in 
saveAsNewAPIHadoopFile

  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 
line 1257, in __call__

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, 
in deco

  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 
328, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.

: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory 
s3://gauthama-etl-pipelines-test-working-bucket/8c6b2f09-2ba0-42fc-af8e-656188c0186a/hfiles/median
 already exists

at 
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)

at 
org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:393)

at 
org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)

at 
org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:1000)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)

at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopFile$2.apply(PairRDDFunctions.scala:991)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at 

RE: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-17 Thread Gautham Acharya
Users can also request random rows in those columns. So a user can request a 
subset of the matrix (N rows and N columns) which would change the value of the 
correlation coefficient.

From: Jerry Vinokurov [mailto:grapesmo...@gmail.com]
Sent: Wednesday, July 17, 2019 1:27 PM
To: user@spark.apache.org
Subject: Re: [Beginner] Run compute on large matrices and return the result in 
seconds?

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.

Maybe I'm not understanding something about this use case, but why is 
precomputation not an option? Is it because the matrices themselves change? 
Because if the matrices are constant, then I think precomputation would work 
for you even if the users request random correlations. You can just store the 
resulting column with the matrix id, row, and column as the key for retrieval.

My general impression is that while you could do this in Spark, it's probably 
not the correct framework for carrying out this kind of operation. This feels 
more like a job for something like OpenMP than for Spark.


On Wed, Jul 17, 2019 at 3:42 PM Gautham Acharya 
mailto:gauth...@alleninstitute.org>> wrote:
As I said in the my initial message, precomputing is not an option.

Retrieving only the top/bottom N most correlated is an option – would that 
speed up the results?

Our SLAs are soft – slight variations (+- 15 seconds) will not cause issues.

--gautham
From: Patrick McCarthy 
[mailto:pmccar...@dstillery.com<mailto:pmccar...@dstillery.com>]
Sent: Wednesday, July 17, 2019 12:39 PM
To: Gautham Acharya 
mailto:gauth...@alleninstitute.org>>
Cc: Bobby Evans mailto:reva...@gmail.com>>; Steven Stetzler 
mailto:steven.stetz...@gmail.com>>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: [Beginner] Run compute on large matrices and return the result in 
seconds?

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.

Do you really need the results of all 3MM computations, or only the top- and 
bottom-most correlation coefficients? Could correlations be computed on a 
sample and from that estimate a distribution of coefficients? Would it make 
sense to precompute offline and instead focus on fast key-value retrieval, like 
ElasticSearch or ScyllaDB?

Spark is a compute framework rather than a serving backend, I don't think it's 
designed with retrieval SLAs in mind and you may find those SLAs difficult to 
maintain.

On Wed, Jul 17, 2019 at 3:14 PM Gautham Acharya 
mailto:gauth...@alleninstitute.org>> wrote:
Thanks for the reply, Bobby.

I’ve received notice that we can probably tolerate response times of up to 30 
seconds. Would this be more manageable? 5 seconds was an initial ask, but 20-30 
seconds is also a reasonable response time for our use case.

With the new SLA, do you think that we can easily perform this computation in 
spark?
--gautham

From: Bobby Evans [mailto:reva...@gmail.com<mailto:reva...@gmail.com>]
Sent: Wednesday, July 17, 2019 7:06 AM
To: Steven Stetzler 
mailto:steven.stetz...@gmail.com>>
Cc: Gautham Acharya 
mailto:gauth...@alleninstitute.org>>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: [Beginner] Run compute on large matrices and return the result in 
seconds?

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.

Let's do a few quick rules of thumb to get an idea of what kind of processing 
power you will need in general to do what you want.

You need 3,000,000 ints by 50,000 rows.  Each int is 4 bytes so that ends up 
being about 560 GB that you need to fully process in 5 seconds.

If you are reading this from spinning disks (which average about 80 MB/s) you 
would need at least 1,450 disks to just read the data in 5 seconds (that number 
can vary a lot depending on the storage format and your compression ratio).
If you are reading the data over a network (let's say 10GigE even though in 
practice you cannot get that in the cloud easily) you would need about 90 NICs 
just to read the data in 5 seconds, again depending on the compression ration 
this may be lower.
If you assume you have a cluster where it all fits in main memory and have 
cached all of the data in memory (which in practice I have seen on most modern 
systems at somewhere between 12 and 16 GB/sec) you would need between 7 and 10 
machines just to read through the data once in 5 seconds.  Spark also stores 
cached data compressed so you might need less as well.

All the numbers fit with things that spark should be able to handle, b

RE: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-17 Thread Gautham Acharya
As I said in the my initial message, precomputing is not an option.

Retrieving only the top/bottom N most correlated is an option – would that 
speed up the results?

Our SLAs are soft – slight variations (+- 15 seconds) will not cause issues.

--gautham
From: Patrick McCarthy [mailto:pmccar...@dstillery.com]
Sent: Wednesday, July 17, 2019 12:39 PM
To: Gautham Acharya 
Cc: Bobby Evans ; Steven Stetzler 
; user@spark.apache.org
Subject: Re: [Beginner] Run compute on large matrices and return the result in 
seconds?

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.

Do you really need the results of all 3MM computations, or only the top- and 
bottom-most correlation coefficients? Could correlations be computed on a 
sample and from that estimate a distribution of coefficients? Would it make 
sense to precompute offline and instead focus on fast key-value retrieval, like 
ElasticSearch or ScyllaDB?

Spark is a compute framework rather than a serving backend, I don't think it's 
designed with retrieval SLAs in mind and you may find those SLAs difficult to 
maintain.

On Wed, Jul 17, 2019 at 3:14 PM Gautham Acharya 
mailto:gauth...@alleninstitute.org>> wrote:
Thanks for the reply, Bobby.

I’ve received notice that we can probably tolerate response times of up to 30 
seconds. Would this be more manageable? 5 seconds was an initial ask, but 20-30 
seconds is also a reasonable response time for our use case.

With the new SLA, do you think that we can easily perform this computation in 
spark?
--gautham

From: Bobby Evans [mailto:reva...@gmail.com<mailto:reva...@gmail.com>]
Sent: Wednesday, July 17, 2019 7:06 AM
To: Steven Stetzler 
mailto:steven.stetz...@gmail.com>>
Cc: Gautham Acharya 
mailto:gauth...@alleninstitute.org>>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: [Beginner] Run compute on large matrices and return the result in 
seconds?

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.

Let's do a few quick rules of thumb to get an idea of what kind of processing 
power you will need in general to do what you want.

You need 3,000,000 ints by 50,000 rows.  Each int is 4 bytes so that ends up 
being about 560 GB that you need to fully process in 5 seconds.

If you are reading this from spinning disks (which average about 80 MB/s) you 
would need at least 1,450 disks to just read the data in 5 seconds (that number 
can vary a lot depending on the storage format and your compression ratio).
If you are reading the data over a network (let's say 10GigE even though in 
practice you cannot get that in the cloud easily) you would need about 90 NICs 
just to read the data in 5 seconds, again depending on the compression ration 
this may be lower.
If you assume you have a cluster where it all fits in main memory and have 
cached all of the data in memory (which in practice I have seen on most modern 
systems at somewhere between 12 and 16 GB/sec) you would need between 7 and 10 
machines just to read through the data once in 5 seconds.  Spark also stores 
cached data compressed so you might need less as well.

All the numbers fit with things that spark should be able to handle, but a 5 
second SLA is very tight for this amount of data.

Can you make this work with Spark?  probably. Does spark have something built 
in that will make this fast and simple for you?  I doubt it you have some very 
tight requirements and will likely have to write something custom to make it 
work the way you want.


On Thu, Jul 11, 2019 at 4:12 PM Steven Stetzler 
mailto:steven.stetz...@gmail.com>> wrote:
Hi Gautham,

I am a beginner spark user too and I may not have a complete understanding of 
your question, but I thought I would start a discussion anyway. Have you looked 
into using Spark's built in Correlation function? 
(https://spark.apache.org/docs/latest/ml-statistics.html<https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fml-statistics.html=02%7C01%7C%7Cabf5672f7ecf4fe1d91808d70aee79bf%7C32669cd6737f4b398bddd6951120d3fc%7C0%7C0%7C636989891687806480=lDOdQ4kolIDqJ94izPnPvBf0cu9dyizcdKnh0q7B4t8%3D=0>)
 This might let you get what you want (per-row correlation against the same 
matrix) without having to deal with parallelizing the computation yourself. 
Also, I think the question of how quick you can get your results is largely a 
data access question vs how fast is Spark question. As long as you can exploit 
data parallelism (i.e. you can partition up your data), Spark will give you a 
speedup. You can imagine that if you had a large machine with many cores and 
~100 GB of RAM (e.g. a m5.12xlarge EC2 inst

RE: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-17 Thread Gautham Acharya
Thanks for the reply, Bobby.

I’ve received notice that we can probably tolerate response times of up to 30 
seconds. Would this be more manageable? 5 seconds was an initial ask, but 20-30 
seconds is also a reasonable response time for our use case.

With the new SLA, do you think that we can easily perform this computation in 
spark?
--gautham

From: Bobby Evans [mailto:reva...@gmail.com]
Sent: Wednesday, July 17, 2019 7:06 AM
To: Steven Stetzler 
Cc: Gautham Acharya ; user@spark.apache.org
Subject: Re: [Beginner] Run compute on large matrices and return the result in 
seconds?

CAUTION: This email originated from outside the Allen Institute. Please do not 
click links or open attachments unless you've validated the sender and know the 
content is safe.

Let's do a few quick rules of thumb to get an idea of what kind of processing 
power you will need in general to do what you want.

You need 3,000,000 ints by 50,000 rows.  Each int is 4 bytes so that ends up 
being about 560 GB that you need to fully process in 5 seconds.

If you are reading this from spinning disks (which average about 80 MB/s) you 
would need at least 1,450 disks to just read the data in 5 seconds (that number 
can vary a lot depending on the storage format and your compression ratio).
If you are reading the data over a network (let's say 10GigE even though in 
practice you cannot get that in the cloud easily) you would need about 90 NICs 
just to read the data in 5 seconds, again depending on the compression ration 
this may be lower.
If you assume you have a cluster where it all fits in main memory and have 
cached all of the data in memory (which in practice I have seen on most modern 
systems at somewhere between 12 and 16 GB/sec) you would need between 7 and 10 
machines just to read through the data once in 5 seconds.  Spark also stores 
cached data compressed so you might need less as well.

All the numbers fit with things that spark should be able to handle, but a 5 
second SLA is very tight for this amount of data.

Can you make this work with Spark?  probably. Does spark have something built 
in that will make this fast and simple for you?  I doubt it you have some very 
tight requirements and will likely have to write something custom to make it 
work the way you want.


On Thu, Jul 11, 2019 at 4:12 PM Steven Stetzler 
mailto:steven.stetz...@gmail.com>> wrote:
Hi Gautham,

I am a beginner spark user too and I may not have a complete understanding of 
your question, but I thought I would start a discussion anyway. Have you looked 
into using Spark's built in Correlation function? 
(https://spark.apache.org/docs/latest/ml-statistics.html<https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fml-statistics.html=02%7C01%7C%7C7d44353d2dd5420bc35108d70abff11d%7C32669cd6737f4b398bddd6951120d3fc%7C0%7C1%7C636989691818858010=UG7owx%2FyHayKECNbDbfoNV53nJCSlF06Oak1plpi4RY%3D=0>)
 This might let you get what you want (per-row correlation against the same 
matrix) without having to deal with parallelizing the computation yourself. 
Also, I think the question of how quick you can get your results is largely a 
data access question vs how fast is Spark question. As long as you can exploit 
data parallelism (i.e. you can partition up your data), Spark will give you a 
speedup. You can imagine that if you had a large machine with many cores and 
~100 GB of RAM (e.g. a m5.12xlarge EC2 instance), you could fit your problem in 
main memory and perform your computation with thread based parallelism. This 
might get your result relatively quickly. For a dedicated application with well 
constrained memory and compute requirements, it might not be a bad option to do 
everything on one machine as well. Accessing an external database and 
distributing work over a large number of computers can add overhead that might 
be out of your control.

Thanks,
Steven

On Thu, Jul 11, 2019 at 9:24 AM Gautham Acharya 
mailto:gauth...@alleninstitute.org>> wrote:
Ping? I would really appreciate advice on this! Thank you!

From: Gautham Acharya
Sent: Tuesday, July 9, 2019 4:22 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: [Beginner] Run compute on large matrices and return the result in 
seconds?


This is my first email to this mailing list, so I apologize if I made any 
errors.



My team's going to be building an application and I'm investigating some 
options for distributed compute systems. We want to be performing computes on 
large matrices.



The requirements are as follows:



1. The matrices can be expected to be up to 50,000 columns x 3 million 
rows. The values are all integers (except for the row/column headers).

2. The application needs to select a specific row, and calculate the 
correlation coefficient ( 
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.corr.html<https://nam05.safelinks.

RE: [Beginner] Run compute on large matrices and return the result in seconds?

2019-07-11 Thread Gautham Acharya
Ping? I would really appreciate advice on this! Thank you!

From: Gautham Acharya
Sent: Tuesday, July 9, 2019 4:22 PM
To: user@spark.apache.org
Subject: [Beginner] Run compute on large matrices and return the result in 
seconds?


This is my first email to this mailing list, so I apologize if I made any 
errors.



My team's going to be building an application and I'm investigating some 
options for distributed compute systems. We want to be performing computes on 
large matrices.



The requirements are as follows:



1. The matrices can be expected to be up to 50,000 columns x 3 million 
rows. The values are all integers (except for the row/column headers).

2. The application needs to select a specific row, and calculate the 
correlation coefficient ( 
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.corr.html
 ) against every other row. This means up to 3 million different calculations.

3. A sorted list of the correlation coefficients and their corresponding 
row keys need to be returned in under 5 seconds.

4. Users will eventually request random row/column subsets to run 
calculations on, so precomputing our coefficients is not an option. This needs 
to be done on request.



I've been looking at many compute solutions, but I'd consider Spark first due 
to the widespread use and community. I currently have my data loaded into 
Apache Hbase for a different scenario (random access of rows/columns). I've 
naively tired loading a dataframe from the CSV using a Spark instance hosted on 
AWS EMR, but getting the results for even a single correlation takes over 20 
seconds.



Thank you!


--gautham



[Beginner] Run compute on large matrices and return the result in seconds?

2019-07-09 Thread Gautham Acharya
This is my first email to this mailing list, so I apologize if I made any 
errors.



My team's going to be building an application and I'm investigating some 
options for distributed compute systems. We want to be performing computes on 
large matrices.



The requirements are as follows:



1. The matrices can be expected to be up to 50,000 columns x 3 million 
rows. The values are all integers (except for the row/column headers).

2. The application needs to select a specific row, and calculate the 
correlation coefficient ( 
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.corr.html
 ) against every other row. This means up to 3 million different calculations.

3. A sorted list of the correlation coefficients and their corresponding 
row keys need to be returned in under 5 seconds.

4. Users will eventually request random row/column subsets to run 
calculations on, so precomputing our coefficients is not an option. This needs 
to be done on request.



I've been looking at many compute solutions, but I'd consider Spark first due 
to the widespread use and community. I currently have my data loaded into 
Apache Hbase for a different scenario (random access of rows/columns). I've 
naively tired loading a dataframe from the CSV using a Spark instance hosted on 
AWS EMR, but getting the results for even a single correlation takes over 20 
seconds.



Thank you!


--gautham