Re: Sparse Matrix Storage Consumption Issue

2017-05-08 Thread Matthias Boehm
quick update: The poor runtime of scenario (3) is now fixed in master. The
reasons were unnecessary shuffle and load imbalance for spark rexpand
operations with small input vector and large, ultra-sparse output matrix.
Thanks for pointing this out Mingyang.

Regards,
Matthias


On Mon, May 8, 2017 at 3:09 PM, Matthias Boehm 
wrote:

> ok thanks for sharing - I'll have a look later this week.
>
> Regards,
> Matthias
>
> On Mon, May 8, 2017 at 2:20 PM, Mingyang Wang  wrote:
>
>> Hi Matthias,
>>
>> With a driver memory of 10GB, all operations were executed on CP, and I
>> did
>> observe that the version of reading FK as a vector and then converting it
>> was faster, which took 8.337s (6.246s on GC) while the version of reading
>> FK as a matrix took 31.680s (26.256s on GC).
>>
>> For the distributed caching, I have re-run all scripts with the following
>> Spark configuration
>>
>> --driver-memory 1G \
>> --executor-memory 100G \
>> --executor-cores 20 \
>> --num-executors 1 \
>> --conf spark.driver.maxResultSize=0 \
>> --conf spark.rpc.message.maxSize=128 \
>>
>> And it seems that both versions have some problems.
>>
>> 1) Sum of FK in matrix form
>> ```
>> FK = read($FK)
>> print("Sum of FK = " + sum(FK))
>> ```
>> Worked as expected. Took 8.786s.
>>
>>
>> 2) Sum of FK in matrix form, with checkpoints
>> ```
>> FK = read($FK)
>> if (1 == 1) {}
>> print("Sum of FK = " + sum(FK))
>> ```
>> It took 89.731s, with detailed stats shown below.
>>
>> 17/05/08 13:15:00 INFO api.ScriptExecutorUtils: SystemML Statistics:
>> Total elapsed time: 91.619 sec.
>> Total compilation time: 1.889 sec.
>> Total execution time:   89.731 sec.
>> Number of compiled Spark inst:  2.
>> Number of executed Spark inst:  2.
>> Cache hits (Mem, WB, FS, HDFS): 1/0/0/0.
>> Cache writes (WB, FS, HDFS):0/0/0.
>> Cache times (ACQr/m, RLS, EXP): 0.000/0.001/0.000/0.000 sec.
>> HOP DAGs recompiled (PRED, SB): 0/0.
>> HOP DAGs recompile time:0.000 sec.
>> Spark ctx create time (lazy):   0.895 sec.
>> Spark trans counts (par,bc,col):0/0/0.
>> Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
>> Total JIT compile time: 5.001 sec.
>> Total JVM GC count: 8.
>> Total JVM GC time:  0.161 sec.
>> Heavy hitter instructions (name, time, count):
>> -- 1)   sp_uak+ 89.349 sec  1
>> -- 2)   sp_chkpoint 0.381 sec   1
>> -- 3)   ==  0.001 sec   1
>> -- 4)   +   0.000 sec   1
>> -- 5)   print   0.000 sec   1
>> -- 6)   castdts 0.000 sec   1
>> -- 7)   createvar   0.000 sec   3
>> -- 8)   rmvar   0.000 sec   7
>> -- 9)   assignvar   0.000 sec   1
>> -- 10)  cpvar   0.000 sec   1
>>
>>
>> 3) Sum of FK in vector form
>> ```
>> FK_colvec = read($FK_colvec)
>> FK = table(seq(1,nrow(FK_colvec)), FK_colvec, nrow(FK_colvec), 1e6)
>> print("Sum of FK = " + sum(FK))
>> ```
>> Things really went wrong. It took ~10 mins.
>>
>> 17/05/08 13:26:36 INFO api.ScriptExecutorUtils: SystemML Statistics:
>> Total elapsed time: 605.688 sec.
>> Total compilation time: 1.857 sec.
>> Total execution time:   603.832 sec.
>> Number of compiled Spark inst:  2.
>> Number of executed Spark inst:  2.
>> Cache hits (Mem, WB, FS, HDFS): 1/0/0/0.
>> Cache writes (WB, FS, HDFS):0/0/0.
>> Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec.
>> HOP DAGs recompiled (PRED, SB): 0/1.
>> HOP DAGs recompile time:0.002 sec.
>> Spark ctx create time (lazy):   0.858 sec.
>> Spark trans counts (par,bc,col):0/0/0.
>> Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
>> Total JIT compile time: 3.682 sec.
>> Total JVM GC count: 5.
>> Total JVM GC time:  0.064 sec.
>> Heavy hitter instructions (name, time, count):
>> -- 1)   sp_uak+ 603.447 sec 1
>> -- 2)   sp_rexpand  0.381 sec   1
>> -- 3)   createvar   0.002 sec   3
>> -- 4)   rmvar   0.000 sec   5
>> -- 5)   +   0.000 sec   1
>> -- 6)   print   0.000 sec   1
>> -- 7)   castdts 0.000 sec   1
>>
>> Also, from the executor log, there were some disk spilling:
>>
>> 17/05/08 13:20:00 INFO ExternalSorter: Thread 109 spilling in-memory
>> map of 33.8 GB to disk (1 time so far)
>> 17/05/08 13:20:20 INFO ExternalSorter: Thread 116 spilling in-memory
>> map of 31.2 GB to disk (1 time so far)
>>
>> ...
>>
>> 17/05/08 13:24:50 INFO ExternalAppendOnlyMap: Thread 116 spilling
>> in-memory map of 26.9 GB to disk (1 time so far)
>> 17/05/08 13:25:08 INFO ExternalAppendOnlyMap: Thread 109 spilling
>> in-memory map of 26.6 GB to disk (1 time so far)
>>
>>
>>
>> Regards,
>> Mingyang
>>
>> On Sat, May 6, 2017 at 9:12 PM Matthias Boehm 
>> wrote:
>>
>> > yes, even with the previous patch for improved memory efficiency of
>> > ultra-sparse matrices in MCSR format, there is still some unnecessary

Re: Sparse Matrix Storage Consumption Issue

2017-05-08 Thread Matthias Boehm
ok thanks for sharing - I'll have a look later this week.

Regards,
Matthias

On Mon, May 8, 2017 at 2:20 PM, Mingyang Wang  wrote:

> Hi Matthias,
>
> With a driver memory of 10GB, all operations were executed on CP, and I did
> observe that the version of reading FK as a vector and then converting it
> was faster, which took 8.337s (6.246s on GC) while the version of reading
> FK as a matrix took 31.680s (26.256s on GC).
>
> For the distributed caching, I have re-run all scripts with the following
> Spark configuration
>
> --driver-memory 1G \
> --executor-memory 100G \
> --executor-cores 20 \
> --num-executors 1 \
> --conf spark.driver.maxResultSize=0 \
> --conf spark.rpc.message.maxSize=128 \
>
> And it seems that both versions have some problems.
>
> 1) Sum of FK in matrix form
> ```
> FK = read($FK)
> print("Sum of FK = " + sum(FK))
> ```
> Worked as expected. Took 8.786s.
>
>
> 2) Sum of FK in matrix form, with checkpoints
> ```
> FK = read($FK)
> if (1 == 1) {}
> print("Sum of FK = " + sum(FK))
> ```
> It took 89.731s, with detailed stats shown below.
>
> 17/05/08 13:15:00 INFO api.ScriptExecutorUtils: SystemML Statistics:
> Total elapsed time: 91.619 sec.
> Total compilation time: 1.889 sec.
> Total execution time:   89.731 sec.
> Number of compiled Spark inst:  2.
> Number of executed Spark inst:  2.
> Cache hits (Mem, WB, FS, HDFS): 1/0/0/0.
> Cache writes (WB, FS, HDFS):0/0/0.
> Cache times (ACQr/m, RLS, EXP): 0.000/0.001/0.000/0.000 sec.
> HOP DAGs recompiled (PRED, SB): 0/0.
> HOP DAGs recompile time:0.000 sec.
> Spark ctx create time (lazy):   0.895 sec.
> Spark trans counts (par,bc,col):0/0/0.
> Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
> Total JIT compile time: 5.001 sec.
> Total JVM GC count: 8.
> Total JVM GC time:  0.161 sec.
> Heavy hitter instructions (name, time, count):
> -- 1)   sp_uak+ 89.349 sec  1
> -- 2)   sp_chkpoint 0.381 sec   1
> -- 3)   ==  0.001 sec   1
> -- 4)   +   0.000 sec   1
> -- 5)   print   0.000 sec   1
> -- 6)   castdts 0.000 sec   1
> -- 7)   createvar   0.000 sec   3
> -- 8)   rmvar   0.000 sec   7
> -- 9)   assignvar   0.000 sec   1
> -- 10)  cpvar   0.000 sec   1
>
>
> 3) Sum of FK in vector form
> ```
> FK_colvec = read($FK_colvec)
> FK = table(seq(1,nrow(FK_colvec)), FK_colvec, nrow(FK_colvec), 1e6)
> print("Sum of FK = " + sum(FK))
> ```
> Things really went wrong. It took ~10 mins.
>
> 17/05/08 13:26:36 INFO api.ScriptExecutorUtils: SystemML Statistics:
> Total elapsed time: 605.688 sec.
> Total compilation time: 1.857 sec.
> Total execution time:   603.832 sec.
> Number of compiled Spark inst:  2.
> Number of executed Spark inst:  2.
> Cache hits (Mem, WB, FS, HDFS): 1/0/0/0.
> Cache writes (WB, FS, HDFS):0/0/0.
> Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec.
> HOP DAGs recompiled (PRED, SB): 0/1.
> HOP DAGs recompile time:0.002 sec.
> Spark ctx create time (lazy):   0.858 sec.
> Spark trans counts (par,bc,col):0/0/0.
> Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
> Total JIT compile time: 3.682 sec.
> Total JVM GC count: 5.
> Total JVM GC time:  0.064 sec.
> Heavy hitter instructions (name, time, count):
> -- 1)   sp_uak+ 603.447 sec 1
> -- 2)   sp_rexpand  0.381 sec   1
> -- 3)   createvar   0.002 sec   3
> -- 4)   rmvar   0.000 sec   5
> -- 5)   +   0.000 sec   1
> -- 6)   print   0.000 sec   1
> -- 7)   castdts 0.000 sec   1
>
> Also, from the executor log, there were some disk spilling:
>
> 17/05/08 13:20:00 INFO ExternalSorter: Thread 109 spilling in-memory
> map of 33.8 GB to disk (1 time so far)
> 17/05/08 13:20:20 INFO ExternalSorter: Thread 116 spilling in-memory
> map of 31.2 GB to disk (1 time so far)
>
> ...
>
> 17/05/08 13:24:50 INFO ExternalAppendOnlyMap: Thread 116 spilling
> in-memory map of 26.9 GB to disk (1 time so far)
> 17/05/08 13:25:08 INFO ExternalAppendOnlyMap: Thread 109 spilling
> in-memory map of 26.6 GB to disk (1 time so far)
>
>
>
> Regards,
> Mingyang
>
> On Sat, May 6, 2017 at 9:12 PM Matthias Boehm 
> wrote:
>
> > yes, even with the previous patch for improved memory efficiency of
> > ultra-sparse matrices in MCSR format, there is still some unnecessary
> > overhead that leads to garbage collection. For this reason, I would
> > recommend to read it as vector and convert it in memory to an
> ultra-sparse
> > matrix. I also just pushed a minor performance improvement for reading
> > ultra-sparse matrices but the major bottleneck still exist.
> >
> > The core issue is that we can't read these ultra-sparse matrices into a
> CSR
> > representation because it does not allow for efficient incremental
> > construction (with unordered inputs and 

Re: Sparse Matrix Storage Consumption Issue

2017-05-08 Thread Mingyang Wang
Hi Matthias,

With a driver memory of 10GB, all operations were executed on CP, and I did
observe that the version of reading FK as a vector and then converting it
was faster, which took 8.337s (6.246s on GC) while the version of reading
FK as a matrix took 31.680s (26.256s on GC).

For the distributed caching, I have re-run all scripts with the following
Spark configuration

--driver-memory 1G \
--executor-memory 100G \
--executor-cores 20 \
--num-executors 1 \
--conf spark.driver.maxResultSize=0 \
--conf spark.rpc.message.maxSize=128 \

And it seems that both versions have some problems.

1) Sum of FK in matrix form
```
FK = read($FK)
print("Sum of FK = " + sum(FK))
```
Worked as expected. Took 8.786s.


2) Sum of FK in matrix form, with checkpoints
```
FK = read($FK)
if (1 == 1) {}
print("Sum of FK = " + sum(FK))
```
It took 89.731s, with detailed stats shown below.

17/05/08 13:15:00 INFO api.ScriptExecutorUtils: SystemML Statistics:
Total elapsed time: 91.619 sec.
Total compilation time: 1.889 sec.
Total execution time:   89.731 sec.
Number of compiled Spark inst:  2.
Number of executed Spark inst:  2.
Cache hits (Mem, WB, FS, HDFS): 1/0/0/0.
Cache writes (WB, FS, HDFS):0/0/0.
Cache times (ACQr/m, RLS, EXP): 0.000/0.001/0.000/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/0.
HOP DAGs recompile time:0.000 sec.
Spark ctx create time (lazy):   0.895 sec.
Spark trans counts (par,bc,col):0/0/0.
Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
Total JIT compile time: 5.001 sec.
Total JVM GC count: 8.
Total JVM GC time:  0.161 sec.
Heavy hitter instructions (name, time, count):
-- 1)   sp_uak+ 89.349 sec  1
-- 2)   sp_chkpoint 0.381 sec   1
-- 3)   ==  0.001 sec   1
-- 4)   +   0.000 sec   1
-- 5)   print   0.000 sec   1
-- 6)   castdts 0.000 sec   1
-- 7)   createvar   0.000 sec   3
-- 8)   rmvar   0.000 sec   7
-- 9)   assignvar   0.000 sec   1
-- 10)  cpvar   0.000 sec   1


3) Sum of FK in vector form
```
FK_colvec = read($FK_colvec)
FK = table(seq(1,nrow(FK_colvec)), FK_colvec, nrow(FK_colvec), 1e6)
print("Sum of FK = " + sum(FK))
```
Things really went wrong. It took ~10 mins.

17/05/08 13:26:36 INFO api.ScriptExecutorUtils: SystemML Statistics:
Total elapsed time: 605.688 sec.
Total compilation time: 1.857 sec.
Total execution time:   603.832 sec.
Number of compiled Spark inst:  2.
Number of executed Spark inst:  2.
Cache hits (Mem, WB, FS, HDFS): 1/0/0/0.
Cache writes (WB, FS, HDFS):0/0/0.
Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/1.
HOP DAGs recompile time:0.002 sec.
Spark ctx create time (lazy):   0.858 sec.
Spark trans counts (par,bc,col):0/0/0.
Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
Total JIT compile time: 3.682 sec.
Total JVM GC count: 5.
Total JVM GC time:  0.064 sec.
Heavy hitter instructions (name, time, count):
-- 1)   sp_uak+ 603.447 sec 1
-- 2)   sp_rexpand  0.381 sec   1
-- 3)   createvar   0.002 sec   3
-- 4)   rmvar   0.000 sec   5
-- 5)   +   0.000 sec   1
-- 6)   print   0.000 sec   1
-- 7)   castdts 0.000 sec   1

Also, from the executor log, there were some disk spilling:

17/05/08 13:20:00 INFO ExternalSorter: Thread 109 spilling in-memory
map of 33.8 GB to disk (1 time so far)
17/05/08 13:20:20 INFO ExternalSorter: Thread 116 spilling in-memory
map of 31.2 GB to disk (1 time so far)

...

17/05/08 13:24:50 INFO ExternalAppendOnlyMap: Thread 116 spilling
in-memory map of 26.9 GB to disk (1 time so far)
17/05/08 13:25:08 INFO ExternalAppendOnlyMap: Thread 109 spilling
in-memory map of 26.6 GB to disk (1 time so far)



Regards,
Mingyang

On Sat, May 6, 2017 at 9:12 PM Matthias Boehm 
wrote:

> yes, even with the previous patch for improved memory efficiency of
> ultra-sparse matrices in MCSR format, there is still some unnecessary
> overhead that leads to garbage collection. For this reason, I would
> recommend to read it as vector and convert it in memory to an ultra-sparse
> matrix. I also just pushed a minor performance improvement for reading
> ultra-sparse matrices but the major bottleneck still exist.
>
> The core issue is that we can't read these ultra-sparse matrices into a CSR
> representation because it does not allow for efficient incremental
> construction (with unordered inputs and multi-threaded read). However, I
> created SYSTEMML-1587 to solve this in the general case. The idea is to
> read ultra-sparse matrices into thread-local COO deltas and finally merge
> it into a CSR representation. The initial results are very promising and
> it's safe because the temporary memory requirements are covered by the MCSR
> estimate, but it will take a while because I want to introduce 

Re: Sparse Matrix Storage Consumption Issue

2017-05-06 Thread Matthias Boehm
yes, even with the previous patch for improved memory efficiency of
ultra-sparse matrices in MCSR format, there is still some unnecessary
overhead that leads to garbage collection. For this reason, I would
recommend to read it as vector and convert it in memory to an ultra-sparse
matrix. I also just pushed a minor performance improvement for reading
ultra-sparse matrices but the major bottleneck still exist.

The core issue is that we can't read these ultra-sparse matrices into a CSR
representation because it does not allow for efficient incremental
construction (with unordered inputs and multi-threaded read). However, I
created SYSTEMML-1587 to solve this in the general case. The idea is to
read ultra-sparse matrices into thread-local COO deltas and finally merge
it into a CSR representation. The initial results are very promising and
it's safe because the temporary memory requirements are covered by the MCSR
estimate, but it will take a while because I want to introduce this
consistently for all readers (single-/multi-threaded, all formats).

In contrast to the read issue, I was not able to reproduce the described
performance issue of distributed caching. Could you please double check
that this test also used the current master build and perhaps share the
detailed setup again (e.g., num executors, data distribution, etc). Thanks.

Regards,
Matthias


On Thu, May 4, 2017 at 9:55 PM, Mingyang Wang  wrote:

> Out of curiosity, I increased the driver memory to 10GB, and then all
> operations were executed on CP. It took 37.166s but JVM GC took 30.534s. I
> was wondering whether this is the expected behavior?
>
> Total elapsed time: 38.093 sec.
> Total compilation time: 0.926 sec.
> Total execution time: 37.166 sec.
> Number of compiled Spark inst: 0.
> Number of executed Spark inst: 0.
> Cache hits (Mem, WB, FS, HDFS): 0/0/0/1.
> Cache writes (WB, FS, HDFS): 0/0/0.
> Cache times (ACQr/m, RLS, EXP): 30.400/0.000/0.001/0.000 sec.
> HOP DAGs recompiled (PRED, SB): 0/0.
> HOP DAGs recompile time: 0.000 sec.
> Spark ctx create time (lazy): 0.000 sec.
> Spark trans counts (par,bc,col):0/0/0.
> Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
> Total JIT compile time: 22.302 sec.
> Total JVM GC count: 11.
> Total JVM GC time: 30.534 sec.
> Heavy hitter instructions (name, time, count):
> -- 1) uak+ 37.166 sec 1
> -- 2) == 0.001 sec 1
> -- 3) + 0.000 sec 1
> -- 4) print 0.000 sec 1
> -- 5) rmvar 0.000 sec 5
> -- 6) createvar 0.000 sec 1
> -- 7) assignvar 0.000 sec 1
> -- 8) cpvar 0.000 sec 1
>
> Regards,
> Mingyang
>
> On Thu, May 4, 2017 at 9:48 PM Mingyang Wang  wrote:
>
> > Hi Matthias,
> >
> > Thanks for the patch.
> >
> > I have re-run the experiment and observed that there was indeed no more
> > memory pressure, but it still took ~90s for this simple script. I was
> > wondering what is the bottleneck for this case?
> >
> >
> > Total elapsed time: 94.800 sec.
> > Total compilation time: 1.826 sec.
> > Total execution time: 92.974 sec.
> > Number of compiled Spark inst: 2.
> > Number of executed Spark inst: 2.
> > Cache hits (Mem, WB, FS, HDFS): 1/0/0/0.
> > Cache writes (WB, FS, HDFS): 0/0/0.
> > Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec.
> > HOP DAGs recompiled (PRED, SB): 0/0.
> > HOP DAGs recompile time: 0.000 sec.
> > Spark ctx create time (lazy): 0.860 sec.
> > Spark trans counts (par,bc,col):0/0/0.
> > Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
> > Total JIT compile time: 3.498 sec.
> > Total JVM GC count: 5.
> > Total JVM GC time: 0.064 sec.
> > Heavy hitter instructions (name, time, count):
> > -- 1) sp_uak+ 92.597 sec 1
> > -- 2) sp_chkpoint 0.377 sec 1
> > -- 3) == 0.001 sec 1
> > -- 4) print 0.000 sec 1
> > -- 5) + 0.000 sec 1
> > -- 6) castdts 0.000 sec 1
> > -- 7) createvar 0.000 sec 3
> > -- 8) rmvar 0.000 sec 7
> > -- 9) assignvar 0.000 sec 1
> > -- 10) cpvar 0.000 sec 1
> >
> > Regards,
> > Mingyang
> >
> > On Wed, May 3, 2017 at 8:54 AM Matthias Boehm 
> > wrote:
> >
> >> to summarize, this was an issue of selecting serialized representations
> >> for large ultra-sparse matrices. Thanks again for sharing your feedback
> >> with us.
> >>
> >> 1) In-memory representation: In CSR every non-zero will require 12 bytes
> >> - this is 240MB in your case. The overall memory consumption, however,
> >> depends on the distribution of non-zeros: In CSR, each block with at
> >> least one non-zero requires 4KB for row pointers. Assuming uniform
> >> distribution (the worst case), this gives us 80GB. This is likely the
> >> problem here. Every empty block would have an overhead of 44Bytes but
> >> for the worst-case assumption, there are no empty blocks left. We do not
> >> use COO for checkpoints because it would slow down subsequent
> operations.
> >>
> >> 2) Serialized/on-disk representation: For sparse datasets that are
> >> expected to exceed aggregate memory, we used to use a serialized
> >> 

Re: Sparse Matrix Storage Consumption Issue

2017-05-04 Thread Mingyang Wang
Out of curiosity, I increased the driver memory to 10GB, and then all
operations were executed on CP. It took 37.166s but JVM GC took 30.534s. I
was wondering whether this is the expected behavior?

Total elapsed time: 38.093 sec.
Total compilation time: 0.926 sec.
Total execution time: 37.166 sec.
Number of compiled Spark inst: 0.
Number of executed Spark inst: 0.
Cache hits (Mem, WB, FS, HDFS): 0/0/0/1.
Cache writes (WB, FS, HDFS): 0/0/0.
Cache times (ACQr/m, RLS, EXP): 30.400/0.000/0.001/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/0.
HOP DAGs recompile time: 0.000 sec.
Spark ctx create time (lazy): 0.000 sec.
Spark trans counts (par,bc,col):0/0/0.
Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
Total JIT compile time: 22.302 sec.
Total JVM GC count: 11.
Total JVM GC time: 30.534 sec.
Heavy hitter instructions (name, time, count):
-- 1) uak+ 37.166 sec 1
-- 2) == 0.001 sec 1
-- 3) + 0.000 sec 1
-- 4) print 0.000 sec 1
-- 5) rmvar 0.000 sec 5
-- 6) createvar 0.000 sec 1
-- 7) assignvar 0.000 sec 1
-- 8) cpvar 0.000 sec 1

Regards,
Mingyang

On Thu, May 4, 2017 at 9:48 PM Mingyang Wang  wrote:

> Hi Matthias,
>
> Thanks for the patch.
>
> I have re-run the experiment and observed that there was indeed no more
> memory pressure, but it still took ~90s for this simple script. I was
> wondering what is the bottleneck for this case?
>
>
> Total elapsed time: 94.800 sec.
> Total compilation time: 1.826 sec.
> Total execution time: 92.974 sec.
> Number of compiled Spark inst: 2.
> Number of executed Spark inst: 2.
> Cache hits (Mem, WB, FS, HDFS): 1/0/0/0.
> Cache writes (WB, FS, HDFS): 0/0/0.
> Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec.
> HOP DAGs recompiled (PRED, SB): 0/0.
> HOP DAGs recompile time: 0.000 sec.
> Spark ctx create time (lazy): 0.860 sec.
> Spark trans counts (par,bc,col):0/0/0.
> Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
> Total JIT compile time: 3.498 sec.
> Total JVM GC count: 5.
> Total JVM GC time: 0.064 sec.
> Heavy hitter instructions (name, time, count):
> -- 1) sp_uak+ 92.597 sec 1
> -- 2) sp_chkpoint 0.377 sec 1
> -- 3) == 0.001 sec 1
> -- 4) print 0.000 sec 1
> -- 5) + 0.000 sec 1
> -- 6) castdts 0.000 sec 1
> -- 7) createvar 0.000 sec 3
> -- 8) rmvar 0.000 sec 7
> -- 9) assignvar 0.000 sec 1
> -- 10) cpvar 0.000 sec 1
>
> Regards,
> Mingyang
>
> On Wed, May 3, 2017 at 8:54 AM Matthias Boehm 
> wrote:
>
>> to summarize, this was an issue of selecting serialized representations
>> for large ultra-sparse matrices. Thanks again for sharing your feedback
>> with us.
>>
>> 1) In-memory representation: In CSR every non-zero will require 12 bytes
>> - this is 240MB in your case. The overall memory consumption, however,
>> depends on the distribution of non-zeros: In CSR, each block with at
>> least one non-zero requires 4KB for row pointers. Assuming uniform
>> distribution (the worst case), this gives us 80GB. This is likely the
>> problem here. Every empty block would have an overhead of 44Bytes but
>> for the worst-case assumption, there are no empty blocks left. We do not
>> use COO for checkpoints because it would slow down subsequent operations.
>>
>> 2) Serialized/on-disk representation: For sparse datasets that are
>> expected to exceed aggregate memory, we used to use a serialized
>> representation (with storage level MEM_AND_DISK_SER) which uses sparse,
>> ultra-sparse, or empty representations. In this form, ultra-sparse
>> blocks require 9 + 16*nnz bytes and empty blocks require 9 bytes.
>> Therefore, with this representation selected, you're dataset should
>> easily fit in aggregate memory. Also, note that chkpoint is only a
>> transformation that persists the rdd, the subsequent operation then
>> pulls the data into memory.
>>
>> At a high-level this was a bug. We missed ultra-sparse representations
>> when introducing an improvement that stores sparse matrices in MCSR
>> format in CSR format on checkpoints which eliminated the need to use a
>> serialized storage level. I just deliver a fix. Now we store such
>> ultra-sparse matrices again in serialized form which should
>> significantly reduce the memory pressure.
>>
>> Regards,
>> Matthias
>>
>> On 5/3/2017 9:38 AM, Mingyang Wang wrote:
>> > Hi all,
>> >
>> > I was playing with a super sparse matrix FK, 2e7 by 1e6, with only one
>> > non-zero value on each row, that is 2e7 non-zero values in total.
>> >
>> > With driver memory of 1GB and executor memory of 100GB, I found the HOP
>> > "Spark chkpoint", which is used to pin the FK matrix in memory, is
>> really
>> > expensive, as it invokes lots of disk operations.
>> >
>> > FK is stored in binary format with 24 blocks, each block is ~45MB, and
>> ~1GB
>> > in total.
>> >
>> > For example, with the script as
>> >
>> > """
>> > FK = read($FK)
>> > print("Sum of FK = " + sum(FK))
>> > """
>> >
>> > things worked fine, and it took ~8s.
>> >
>> > While with the script as
>> >
>> > """
>> > FK = 

Re: Sparse Matrix Storage Consumption Issue

2017-05-04 Thread Mingyang Wang
Hi Matthias,

Thanks for the patch.

I have re-run the experiment and observed that there was indeed no more
memory pressure, but it still took ~90s for this simple script. I was
wondering what is the bottleneck for this case?


Total elapsed time: 94.800 sec.
Total compilation time: 1.826 sec.
Total execution time: 92.974 sec.
Number of compiled Spark inst: 2.
Number of executed Spark inst: 2.
Cache hits (Mem, WB, FS, HDFS): 1/0/0/0.
Cache writes (WB, FS, HDFS): 0/0/0.
Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/0.
HOP DAGs recompile time: 0.000 sec.
Spark ctx create time (lazy): 0.860 sec.
Spark trans counts (par,bc,col):0/0/0.
Spark trans times (par,bc,col): 0.000/0.000/0.000 secs.
Total JIT compile time: 3.498 sec.
Total JVM GC count: 5.
Total JVM GC time: 0.064 sec.
Heavy hitter instructions (name, time, count):
-- 1) sp_uak+ 92.597 sec 1
-- 2) sp_chkpoint 0.377 sec 1
-- 3) == 0.001 sec 1
-- 4) print 0.000 sec 1
-- 5) + 0.000 sec 1
-- 6) castdts 0.000 sec 1
-- 7) createvar 0.000 sec 3
-- 8) rmvar 0.000 sec 7
-- 9) assignvar 0.000 sec 1
-- 10) cpvar 0.000 sec 1

Regards,
Mingyang

On Wed, May 3, 2017 at 8:54 AM Matthias Boehm 
wrote:

> to summarize, this was an issue of selecting serialized representations
> for large ultra-sparse matrices. Thanks again for sharing your feedback
> with us.
>
> 1) In-memory representation: In CSR every non-zero will require 12 bytes
> - this is 240MB in your case. The overall memory consumption, however,
> depends on the distribution of non-zeros: In CSR, each block with at
> least one non-zero requires 4KB for row pointers. Assuming uniform
> distribution (the worst case), this gives us 80GB. This is likely the
> problem here. Every empty block would have an overhead of 44Bytes but
> for the worst-case assumption, there are no empty blocks left. We do not
> use COO for checkpoints because it would slow down subsequent operations.
>
> 2) Serialized/on-disk representation: For sparse datasets that are
> expected to exceed aggregate memory, we used to use a serialized
> representation (with storage level MEM_AND_DISK_SER) which uses sparse,
> ultra-sparse, or empty representations. In this form, ultra-sparse
> blocks require 9 + 16*nnz bytes and empty blocks require 9 bytes.
> Therefore, with this representation selected, you're dataset should
> easily fit in aggregate memory. Also, note that chkpoint is only a
> transformation that persists the rdd, the subsequent operation then
> pulls the data into memory.
>
> At a high-level this was a bug. We missed ultra-sparse representations
> when introducing an improvement that stores sparse matrices in MCSR
> format in CSR format on checkpoints which eliminated the need to use a
> serialized storage level. I just deliver a fix. Now we store such
> ultra-sparse matrices again in serialized form which should
> significantly reduce the memory pressure.
>
> Regards,
> Matthias
>
> On 5/3/2017 9:38 AM, Mingyang Wang wrote:
> > Hi all,
> >
> > I was playing with a super sparse matrix FK, 2e7 by 1e6, with only one
> > non-zero value on each row, that is 2e7 non-zero values in total.
> >
> > With driver memory of 1GB and executor memory of 100GB, I found the HOP
> > "Spark chkpoint", which is used to pin the FK matrix in memory, is really
> > expensive, as it invokes lots of disk operations.
> >
> > FK is stored in binary format with 24 blocks, each block is ~45MB, and
> ~1GB
> > in total.
> >
> > For example, with the script as
> >
> > """
> > FK = read($FK)
> > print("Sum of FK = " + sum(FK))
> > """
> >
> > things worked fine, and it took ~8s.
> >
> > While with the script as
> >
> > """
> > FK = read($FK)
> > if (1 == 1) {}
> > print("Sum of FK = " + sum(FK))
> > """
> >
> > things changed. It took ~92s and I observed lots of disk spills from
> logs.
> > Based on the stats from Spark UI, it seems the materialized FK requires
> >> 54GB storage and thus introduces disk operations.
> >
> > I was wondering, is this the expected behavior of a super sparse matrix?
> >
> >
> > Regards,
> > Mingyang
> >
>


Re: Sparse Matrix Storage Consumption Issue

2017-05-03 Thread Matthias Boehm
to summarize, this was an issue of selecting serialized representations 
for large ultra-sparse matrices. Thanks again for sharing your feedback 
with us.


1) In-memory representation: In CSR every non-zero will require 12 bytes 
- this is 240MB in your case. The overall memory consumption, however, 
depends on the distribution of non-zeros: In CSR, each block with at 
least one non-zero requires 4KB for row pointers. Assuming uniform 
distribution (the worst case), this gives us 80GB. This is likely the 
problem here. Every empty block would have an overhead of 44Bytes but 
for the worst-case assumption, there are no empty blocks left. We do not 
use COO for checkpoints because it would slow down subsequent operations.


2) Serialized/on-disk representation: For sparse datasets that are 
expected to exceed aggregate memory, we used to use a serialized 
representation (with storage level MEM_AND_DISK_SER) which uses sparse, 
ultra-sparse, or empty representations. In this form, ultra-sparse 
blocks require 9 + 16*nnz bytes and empty blocks require 9 bytes. 
Therefore, with this representation selected, you're dataset should 
easily fit in aggregate memory. Also, note that chkpoint is only a 
transformation that persists the rdd, the subsequent operation then 
pulls the data into memory.


At a high-level this was a bug. We missed ultra-sparse representations 
when introducing an improvement that stores sparse matrices in MCSR 
format in CSR format on checkpoints which eliminated the need to use a 
serialized storage level. I just deliver a fix. Now we store such 
ultra-sparse matrices again in serialized form which should 
significantly reduce the memory pressure.


Regards,
Matthias

On 5/3/2017 9:38 AM, Mingyang Wang wrote:

Hi all,

I was playing with a super sparse matrix FK, 2e7 by 1e6, with only one
non-zero value on each row, that is 2e7 non-zero values in total.

With driver memory of 1GB and executor memory of 100GB, I found the HOP
"Spark chkpoint", which is used to pin the FK matrix in memory, is really
expensive, as it invokes lots of disk operations.

FK is stored in binary format with 24 blocks, each block is ~45MB, and ~1GB
in total.

For example, with the script as

"""
FK = read($FK)
print("Sum of FK = " + sum(FK))
"""

things worked fine, and it took ~8s.

While with the script as

"""
FK = read($FK)
if (1 == 1) {}
print("Sum of FK = " + sum(FK))
"""

things changed. It took ~92s and I observed lots of disk spills from logs.
Based on the stats from Spark UI, it seems the materialized FK requires

54GB storage and thus introduces disk operations.


I was wondering, is this the expected behavior of a super sparse matrix?


Regards,
Mingyang