Re: Sparse Matrix Storage Consumption Issue
quick update: The poor runtime of scenario (3) is now fixed in master. The reasons were unnecessary shuffle and load imbalance for spark rexpand operations with small input vector and large, ultra-sparse output matrix. Thanks for pointing this out Mingyang. Regards, Matthias On Mon, May 8, 2017 at 3:09 PM, Matthias Boehmwrote: > ok thanks for sharing - I'll have a look later this week. > > Regards, > Matthias > > On Mon, May 8, 2017 at 2:20 PM, Mingyang Wang wrote: > >> Hi Matthias, >> >> With a driver memory of 10GB, all operations were executed on CP, and I >> did >> observe that the version of reading FK as a vector and then converting it >> was faster, which took 8.337s (6.246s on GC) while the version of reading >> FK as a matrix took 31.680s (26.256s on GC). >> >> For the distributed caching, I have re-run all scripts with the following >> Spark configuration >> >> --driver-memory 1G \ >> --executor-memory 100G \ >> --executor-cores 20 \ >> --num-executors 1 \ >> --conf spark.driver.maxResultSize=0 \ >> --conf spark.rpc.message.maxSize=128 \ >> >> And it seems that both versions have some problems. >> >> 1) Sum of FK in matrix form >> ``` >> FK = read($FK) >> print("Sum of FK = " + sum(FK)) >> ``` >> Worked as expected. Took 8.786s. >> >> >> 2) Sum of FK in matrix form, with checkpoints >> ``` >> FK = read($FK) >> if (1 == 1) {} >> print("Sum of FK = " + sum(FK)) >> ``` >> It took 89.731s, with detailed stats shown below. >> >> 17/05/08 13:15:00 INFO api.ScriptExecutorUtils: SystemML Statistics: >> Total elapsed time: 91.619 sec. >> Total compilation time: 1.889 sec. >> Total execution time: 89.731 sec. >> Number of compiled Spark inst: 2. >> Number of executed Spark inst: 2. >> Cache hits (Mem, WB, FS, HDFS): 1/0/0/0. >> Cache writes (WB, FS, HDFS):0/0/0. >> Cache times (ACQr/m, RLS, EXP): 0.000/0.001/0.000/0.000 sec. >> HOP DAGs recompiled (PRED, SB): 0/0. >> HOP DAGs recompile time:0.000 sec. >> Spark ctx create time (lazy): 0.895 sec. >> Spark trans counts (par,bc,col):0/0/0. >> Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. >> Total JIT compile time: 5.001 sec. >> Total JVM GC count: 8. >> Total JVM GC time: 0.161 sec. >> Heavy hitter instructions (name, time, count): >> -- 1) sp_uak+ 89.349 sec 1 >> -- 2) sp_chkpoint 0.381 sec 1 >> -- 3) == 0.001 sec 1 >> -- 4) + 0.000 sec 1 >> -- 5) print 0.000 sec 1 >> -- 6) castdts 0.000 sec 1 >> -- 7) createvar 0.000 sec 3 >> -- 8) rmvar 0.000 sec 7 >> -- 9) assignvar 0.000 sec 1 >> -- 10) cpvar 0.000 sec 1 >> >> >> 3) Sum of FK in vector form >> ``` >> FK_colvec = read($FK_colvec) >> FK = table(seq(1,nrow(FK_colvec)), FK_colvec, nrow(FK_colvec), 1e6) >> print("Sum of FK = " + sum(FK)) >> ``` >> Things really went wrong. It took ~10 mins. >> >> 17/05/08 13:26:36 INFO api.ScriptExecutorUtils: SystemML Statistics: >> Total elapsed time: 605.688 sec. >> Total compilation time: 1.857 sec. >> Total execution time: 603.832 sec. >> Number of compiled Spark inst: 2. >> Number of executed Spark inst: 2. >> Cache hits (Mem, WB, FS, HDFS): 1/0/0/0. >> Cache writes (WB, FS, HDFS):0/0/0. >> Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec. >> HOP DAGs recompiled (PRED, SB): 0/1. >> HOP DAGs recompile time:0.002 sec. >> Spark ctx create time (lazy): 0.858 sec. >> Spark trans counts (par,bc,col):0/0/0. >> Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. >> Total JIT compile time: 3.682 sec. >> Total JVM GC count: 5. >> Total JVM GC time: 0.064 sec. >> Heavy hitter instructions (name, time, count): >> -- 1) sp_uak+ 603.447 sec 1 >> -- 2) sp_rexpand 0.381 sec 1 >> -- 3) createvar 0.002 sec 3 >> -- 4) rmvar 0.000 sec 5 >> -- 5) + 0.000 sec 1 >> -- 6) print 0.000 sec 1 >> -- 7) castdts 0.000 sec 1 >> >> Also, from the executor log, there were some disk spilling: >> >> 17/05/08 13:20:00 INFO ExternalSorter: Thread 109 spilling in-memory >> map of 33.8 GB to disk (1 time so far) >> 17/05/08 13:20:20 INFO ExternalSorter: Thread 116 spilling in-memory >> map of 31.2 GB to disk (1 time so far) >> >> ... >> >> 17/05/08 13:24:50 INFO ExternalAppendOnlyMap: Thread 116 spilling >> in-memory map of 26.9 GB to disk (1 time so far) >> 17/05/08 13:25:08 INFO ExternalAppendOnlyMap: Thread 109 spilling >> in-memory map of 26.6 GB to disk (1 time so far) >> >> >> >> Regards, >> Mingyang >> >> On Sat, May 6, 2017 at 9:12 PM Matthias Boehm >> wrote: >> >> > yes, even with the previous patch for improved memory efficiency of >> > ultra-sparse matrices in MCSR format, there is still some unnecessary
Re: Sparse Matrix Storage Consumption Issue
ok thanks for sharing - I'll have a look later this week. Regards, Matthias On Mon, May 8, 2017 at 2:20 PM, Mingyang Wangwrote: > Hi Matthias, > > With a driver memory of 10GB, all operations were executed on CP, and I did > observe that the version of reading FK as a vector and then converting it > was faster, which took 8.337s (6.246s on GC) while the version of reading > FK as a matrix took 31.680s (26.256s on GC). > > For the distributed caching, I have re-run all scripts with the following > Spark configuration > > --driver-memory 1G \ > --executor-memory 100G \ > --executor-cores 20 \ > --num-executors 1 \ > --conf spark.driver.maxResultSize=0 \ > --conf spark.rpc.message.maxSize=128 \ > > And it seems that both versions have some problems. > > 1) Sum of FK in matrix form > ``` > FK = read($FK) > print("Sum of FK = " + sum(FK)) > ``` > Worked as expected. Took 8.786s. > > > 2) Sum of FK in matrix form, with checkpoints > ``` > FK = read($FK) > if (1 == 1) {} > print("Sum of FK = " + sum(FK)) > ``` > It took 89.731s, with detailed stats shown below. > > 17/05/08 13:15:00 INFO api.ScriptExecutorUtils: SystemML Statistics: > Total elapsed time: 91.619 sec. > Total compilation time: 1.889 sec. > Total execution time: 89.731 sec. > Number of compiled Spark inst: 2. > Number of executed Spark inst: 2. > Cache hits (Mem, WB, FS, HDFS): 1/0/0/0. > Cache writes (WB, FS, HDFS):0/0/0. > Cache times (ACQr/m, RLS, EXP): 0.000/0.001/0.000/0.000 sec. > HOP DAGs recompiled (PRED, SB): 0/0. > HOP DAGs recompile time:0.000 sec. > Spark ctx create time (lazy): 0.895 sec. > Spark trans counts (par,bc,col):0/0/0. > Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. > Total JIT compile time: 5.001 sec. > Total JVM GC count: 8. > Total JVM GC time: 0.161 sec. > Heavy hitter instructions (name, time, count): > -- 1) sp_uak+ 89.349 sec 1 > -- 2) sp_chkpoint 0.381 sec 1 > -- 3) == 0.001 sec 1 > -- 4) + 0.000 sec 1 > -- 5) print 0.000 sec 1 > -- 6) castdts 0.000 sec 1 > -- 7) createvar 0.000 sec 3 > -- 8) rmvar 0.000 sec 7 > -- 9) assignvar 0.000 sec 1 > -- 10) cpvar 0.000 sec 1 > > > 3) Sum of FK in vector form > ``` > FK_colvec = read($FK_colvec) > FK = table(seq(1,nrow(FK_colvec)), FK_colvec, nrow(FK_colvec), 1e6) > print("Sum of FK = " + sum(FK)) > ``` > Things really went wrong. It took ~10 mins. > > 17/05/08 13:26:36 INFO api.ScriptExecutorUtils: SystemML Statistics: > Total elapsed time: 605.688 sec. > Total compilation time: 1.857 sec. > Total execution time: 603.832 sec. > Number of compiled Spark inst: 2. > Number of executed Spark inst: 2. > Cache hits (Mem, WB, FS, HDFS): 1/0/0/0. > Cache writes (WB, FS, HDFS):0/0/0. > Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec. > HOP DAGs recompiled (PRED, SB): 0/1. > HOP DAGs recompile time:0.002 sec. > Spark ctx create time (lazy): 0.858 sec. > Spark trans counts (par,bc,col):0/0/0. > Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. > Total JIT compile time: 3.682 sec. > Total JVM GC count: 5. > Total JVM GC time: 0.064 sec. > Heavy hitter instructions (name, time, count): > -- 1) sp_uak+ 603.447 sec 1 > -- 2) sp_rexpand 0.381 sec 1 > -- 3) createvar 0.002 sec 3 > -- 4) rmvar 0.000 sec 5 > -- 5) + 0.000 sec 1 > -- 6) print 0.000 sec 1 > -- 7) castdts 0.000 sec 1 > > Also, from the executor log, there were some disk spilling: > > 17/05/08 13:20:00 INFO ExternalSorter: Thread 109 spilling in-memory > map of 33.8 GB to disk (1 time so far) > 17/05/08 13:20:20 INFO ExternalSorter: Thread 116 spilling in-memory > map of 31.2 GB to disk (1 time so far) > > ... > > 17/05/08 13:24:50 INFO ExternalAppendOnlyMap: Thread 116 spilling > in-memory map of 26.9 GB to disk (1 time so far) > 17/05/08 13:25:08 INFO ExternalAppendOnlyMap: Thread 109 spilling > in-memory map of 26.6 GB to disk (1 time so far) > > > > Regards, > Mingyang > > On Sat, May 6, 2017 at 9:12 PM Matthias Boehm > wrote: > > > yes, even with the previous patch for improved memory efficiency of > > ultra-sparse matrices in MCSR format, there is still some unnecessary > > overhead that leads to garbage collection. For this reason, I would > > recommend to read it as vector and convert it in memory to an > ultra-sparse > > matrix. I also just pushed a minor performance improvement for reading > > ultra-sparse matrices but the major bottleneck still exist. > > > > The core issue is that we can't read these ultra-sparse matrices into a > CSR > > representation because it does not allow for efficient incremental > > construction (with unordered inputs and
Re: Sparse Matrix Storage Consumption Issue
Hi Matthias, With a driver memory of 10GB, all operations were executed on CP, and I did observe that the version of reading FK as a vector and then converting it was faster, which took 8.337s (6.246s on GC) while the version of reading FK as a matrix took 31.680s (26.256s on GC). For the distributed caching, I have re-run all scripts with the following Spark configuration --driver-memory 1G \ --executor-memory 100G \ --executor-cores 20 \ --num-executors 1 \ --conf spark.driver.maxResultSize=0 \ --conf spark.rpc.message.maxSize=128 \ And it seems that both versions have some problems. 1) Sum of FK in matrix form ``` FK = read($FK) print("Sum of FK = " + sum(FK)) ``` Worked as expected. Took 8.786s. 2) Sum of FK in matrix form, with checkpoints ``` FK = read($FK) if (1 == 1) {} print("Sum of FK = " + sum(FK)) ``` It took 89.731s, with detailed stats shown below. 17/05/08 13:15:00 INFO api.ScriptExecutorUtils: SystemML Statistics: Total elapsed time: 91.619 sec. Total compilation time: 1.889 sec. Total execution time: 89.731 sec. Number of compiled Spark inst: 2. Number of executed Spark inst: 2. Cache hits (Mem, WB, FS, HDFS): 1/0/0/0. Cache writes (WB, FS, HDFS):0/0/0. Cache times (ACQr/m, RLS, EXP): 0.000/0.001/0.000/0.000 sec. HOP DAGs recompiled (PRED, SB): 0/0. HOP DAGs recompile time:0.000 sec. Spark ctx create time (lazy): 0.895 sec. Spark trans counts (par,bc,col):0/0/0. Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. Total JIT compile time: 5.001 sec. Total JVM GC count: 8. Total JVM GC time: 0.161 sec. Heavy hitter instructions (name, time, count): -- 1) sp_uak+ 89.349 sec 1 -- 2) sp_chkpoint 0.381 sec 1 -- 3) == 0.001 sec 1 -- 4) + 0.000 sec 1 -- 5) print 0.000 sec 1 -- 6) castdts 0.000 sec 1 -- 7) createvar 0.000 sec 3 -- 8) rmvar 0.000 sec 7 -- 9) assignvar 0.000 sec 1 -- 10) cpvar 0.000 sec 1 3) Sum of FK in vector form ``` FK_colvec = read($FK_colvec) FK = table(seq(1,nrow(FK_colvec)), FK_colvec, nrow(FK_colvec), 1e6) print("Sum of FK = " + sum(FK)) ``` Things really went wrong. It took ~10 mins. 17/05/08 13:26:36 INFO api.ScriptExecutorUtils: SystemML Statistics: Total elapsed time: 605.688 sec. Total compilation time: 1.857 sec. Total execution time: 603.832 sec. Number of compiled Spark inst: 2. Number of executed Spark inst: 2. Cache hits (Mem, WB, FS, HDFS): 1/0/0/0. Cache writes (WB, FS, HDFS):0/0/0. Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec. HOP DAGs recompiled (PRED, SB): 0/1. HOP DAGs recompile time:0.002 sec. Spark ctx create time (lazy): 0.858 sec. Spark trans counts (par,bc,col):0/0/0. Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. Total JIT compile time: 3.682 sec. Total JVM GC count: 5. Total JVM GC time: 0.064 sec. Heavy hitter instructions (name, time, count): -- 1) sp_uak+ 603.447 sec 1 -- 2) sp_rexpand 0.381 sec 1 -- 3) createvar 0.002 sec 3 -- 4) rmvar 0.000 sec 5 -- 5) + 0.000 sec 1 -- 6) print 0.000 sec 1 -- 7) castdts 0.000 sec 1 Also, from the executor log, there were some disk spilling: 17/05/08 13:20:00 INFO ExternalSorter: Thread 109 spilling in-memory map of 33.8 GB to disk (1 time so far) 17/05/08 13:20:20 INFO ExternalSorter: Thread 116 spilling in-memory map of 31.2 GB to disk (1 time so far) ... 17/05/08 13:24:50 INFO ExternalAppendOnlyMap: Thread 116 spilling in-memory map of 26.9 GB to disk (1 time so far) 17/05/08 13:25:08 INFO ExternalAppendOnlyMap: Thread 109 spilling in-memory map of 26.6 GB to disk (1 time so far) Regards, Mingyang On Sat, May 6, 2017 at 9:12 PM Matthias Boehmwrote: > yes, even with the previous patch for improved memory efficiency of > ultra-sparse matrices in MCSR format, there is still some unnecessary > overhead that leads to garbage collection. For this reason, I would > recommend to read it as vector and convert it in memory to an ultra-sparse > matrix. I also just pushed a minor performance improvement for reading > ultra-sparse matrices but the major bottleneck still exist. > > The core issue is that we can't read these ultra-sparse matrices into a CSR > representation because it does not allow for efficient incremental > construction (with unordered inputs and multi-threaded read). However, I > created SYSTEMML-1587 to solve this in the general case. The idea is to > read ultra-sparse matrices into thread-local COO deltas and finally merge > it into a CSR representation. The initial results are very promising and > it's safe because the temporary memory requirements are covered by the MCSR > estimate, but it will take a while because I want to introduce
Re: Sparse Matrix Storage Consumption Issue
yes, even with the previous patch for improved memory efficiency of ultra-sparse matrices in MCSR format, there is still some unnecessary overhead that leads to garbage collection. For this reason, I would recommend to read it as vector and convert it in memory to an ultra-sparse matrix. I also just pushed a minor performance improvement for reading ultra-sparse matrices but the major bottleneck still exist. The core issue is that we can't read these ultra-sparse matrices into a CSR representation because it does not allow for efficient incremental construction (with unordered inputs and multi-threaded read). However, I created SYSTEMML-1587 to solve this in the general case. The idea is to read ultra-sparse matrices into thread-local COO deltas and finally merge it into a CSR representation. The initial results are very promising and it's safe because the temporary memory requirements are covered by the MCSR estimate, but it will take a while because I want to introduce this consistently for all readers (single-/multi-threaded, all formats). In contrast to the read issue, I was not able to reproduce the described performance issue of distributed caching. Could you please double check that this test also used the current master build and perhaps share the detailed setup again (e.g., num executors, data distribution, etc). Thanks. Regards, Matthias On Thu, May 4, 2017 at 9:55 PM, Mingyang Wangwrote: > Out of curiosity, I increased the driver memory to 10GB, and then all > operations were executed on CP. It took 37.166s but JVM GC took 30.534s. I > was wondering whether this is the expected behavior? > > Total elapsed time: 38.093 sec. > Total compilation time: 0.926 sec. > Total execution time: 37.166 sec. > Number of compiled Spark inst: 0. > Number of executed Spark inst: 0. > Cache hits (Mem, WB, FS, HDFS): 0/0/0/1. > Cache writes (WB, FS, HDFS): 0/0/0. > Cache times (ACQr/m, RLS, EXP): 30.400/0.000/0.001/0.000 sec. > HOP DAGs recompiled (PRED, SB): 0/0. > HOP DAGs recompile time: 0.000 sec. > Spark ctx create time (lazy): 0.000 sec. > Spark trans counts (par,bc,col):0/0/0. > Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. > Total JIT compile time: 22.302 sec. > Total JVM GC count: 11. > Total JVM GC time: 30.534 sec. > Heavy hitter instructions (name, time, count): > -- 1) uak+ 37.166 sec 1 > -- 2) == 0.001 sec 1 > -- 3) + 0.000 sec 1 > -- 4) print 0.000 sec 1 > -- 5) rmvar 0.000 sec 5 > -- 6) createvar 0.000 sec 1 > -- 7) assignvar 0.000 sec 1 > -- 8) cpvar 0.000 sec 1 > > Regards, > Mingyang > > On Thu, May 4, 2017 at 9:48 PM Mingyang Wang wrote: > > > Hi Matthias, > > > > Thanks for the patch. > > > > I have re-run the experiment and observed that there was indeed no more > > memory pressure, but it still took ~90s for this simple script. I was > > wondering what is the bottleneck for this case? > > > > > > Total elapsed time: 94.800 sec. > > Total compilation time: 1.826 sec. > > Total execution time: 92.974 sec. > > Number of compiled Spark inst: 2. > > Number of executed Spark inst: 2. > > Cache hits (Mem, WB, FS, HDFS): 1/0/0/0. > > Cache writes (WB, FS, HDFS): 0/0/0. > > Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec. > > HOP DAGs recompiled (PRED, SB): 0/0. > > HOP DAGs recompile time: 0.000 sec. > > Spark ctx create time (lazy): 0.860 sec. > > Spark trans counts (par,bc,col):0/0/0. > > Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. > > Total JIT compile time: 3.498 sec. > > Total JVM GC count: 5. > > Total JVM GC time: 0.064 sec. > > Heavy hitter instructions (name, time, count): > > -- 1) sp_uak+ 92.597 sec 1 > > -- 2) sp_chkpoint 0.377 sec 1 > > -- 3) == 0.001 sec 1 > > -- 4) print 0.000 sec 1 > > -- 5) + 0.000 sec 1 > > -- 6) castdts 0.000 sec 1 > > -- 7) createvar 0.000 sec 3 > > -- 8) rmvar 0.000 sec 7 > > -- 9) assignvar 0.000 sec 1 > > -- 10) cpvar 0.000 sec 1 > > > > Regards, > > Mingyang > > > > On Wed, May 3, 2017 at 8:54 AM Matthias Boehm > > wrote: > > > >> to summarize, this was an issue of selecting serialized representations > >> for large ultra-sparse matrices. Thanks again for sharing your feedback > >> with us. > >> > >> 1) In-memory representation: In CSR every non-zero will require 12 bytes > >> - this is 240MB in your case. The overall memory consumption, however, > >> depends on the distribution of non-zeros: In CSR, each block with at > >> least one non-zero requires 4KB for row pointers. Assuming uniform > >> distribution (the worst case), this gives us 80GB. This is likely the > >> problem here. Every empty block would have an overhead of 44Bytes but > >> for the worst-case assumption, there are no empty blocks left. We do not > >> use COO for checkpoints because it would slow down subsequent > operations. > >> > >> 2) Serialized/on-disk representation: For sparse datasets that are > >> expected to exceed aggregate memory, we used to use a serialized > >>
Re: Sparse Matrix Storage Consumption Issue
Out of curiosity, I increased the driver memory to 10GB, and then all operations were executed on CP. It took 37.166s but JVM GC took 30.534s. I was wondering whether this is the expected behavior? Total elapsed time: 38.093 sec. Total compilation time: 0.926 sec. Total execution time: 37.166 sec. Number of compiled Spark inst: 0. Number of executed Spark inst: 0. Cache hits (Mem, WB, FS, HDFS): 0/0/0/1. Cache writes (WB, FS, HDFS): 0/0/0. Cache times (ACQr/m, RLS, EXP): 30.400/0.000/0.001/0.000 sec. HOP DAGs recompiled (PRED, SB): 0/0. HOP DAGs recompile time: 0.000 sec. Spark ctx create time (lazy): 0.000 sec. Spark trans counts (par,bc,col):0/0/0. Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. Total JIT compile time: 22.302 sec. Total JVM GC count: 11. Total JVM GC time: 30.534 sec. Heavy hitter instructions (name, time, count): -- 1) uak+ 37.166 sec 1 -- 2) == 0.001 sec 1 -- 3) + 0.000 sec 1 -- 4) print 0.000 sec 1 -- 5) rmvar 0.000 sec 5 -- 6) createvar 0.000 sec 1 -- 7) assignvar 0.000 sec 1 -- 8) cpvar 0.000 sec 1 Regards, Mingyang On Thu, May 4, 2017 at 9:48 PM Mingyang Wangwrote: > Hi Matthias, > > Thanks for the patch. > > I have re-run the experiment and observed that there was indeed no more > memory pressure, but it still took ~90s for this simple script. I was > wondering what is the bottleneck for this case? > > > Total elapsed time: 94.800 sec. > Total compilation time: 1.826 sec. > Total execution time: 92.974 sec. > Number of compiled Spark inst: 2. > Number of executed Spark inst: 2. > Cache hits (Mem, WB, FS, HDFS): 1/0/0/0. > Cache writes (WB, FS, HDFS): 0/0/0. > Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec. > HOP DAGs recompiled (PRED, SB): 0/0. > HOP DAGs recompile time: 0.000 sec. > Spark ctx create time (lazy): 0.860 sec. > Spark trans counts (par,bc,col):0/0/0. > Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. > Total JIT compile time: 3.498 sec. > Total JVM GC count: 5. > Total JVM GC time: 0.064 sec. > Heavy hitter instructions (name, time, count): > -- 1) sp_uak+ 92.597 sec 1 > -- 2) sp_chkpoint 0.377 sec 1 > -- 3) == 0.001 sec 1 > -- 4) print 0.000 sec 1 > -- 5) + 0.000 sec 1 > -- 6) castdts 0.000 sec 1 > -- 7) createvar 0.000 sec 3 > -- 8) rmvar 0.000 sec 7 > -- 9) assignvar 0.000 sec 1 > -- 10) cpvar 0.000 sec 1 > > Regards, > Mingyang > > On Wed, May 3, 2017 at 8:54 AM Matthias Boehm > wrote: > >> to summarize, this was an issue of selecting serialized representations >> for large ultra-sparse matrices. Thanks again for sharing your feedback >> with us. >> >> 1) In-memory representation: In CSR every non-zero will require 12 bytes >> - this is 240MB in your case. The overall memory consumption, however, >> depends on the distribution of non-zeros: In CSR, each block with at >> least one non-zero requires 4KB for row pointers. Assuming uniform >> distribution (the worst case), this gives us 80GB. This is likely the >> problem here. Every empty block would have an overhead of 44Bytes but >> for the worst-case assumption, there are no empty blocks left. We do not >> use COO for checkpoints because it would slow down subsequent operations. >> >> 2) Serialized/on-disk representation: For sparse datasets that are >> expected to exceed aggregate memory, we used to use a serialized >> representation (with storage level MEM_AND_DISK_SER) which uses sparse, >> ultra-sparse, or empty representations. In this form, ultra-sparse >> blocks require 9 + 16*nnz bytes and empty blocks require 9 bytes. >> Therefore, with this representation selected, you're dataset should >> easily fit in aggregate memory. Also, note that chkpoint is only a >> transformation that persists the rdd, the subsequent operation then >> pulls the data into memory. >> >> At a high-level this was a bug. We missed ultra-sparse representations >> when introducing an improvement that stores sparse matrices in MCSR >> format in CSR format on checkpoints which eliminated the need to use a >> serialized storage level. I just deliver a fix. Now we store such >> ultra-sparse matrices again in serialized form which should >> significantly reduce the memory pressure. >> >> Regards, >> Matthias >> >> On 5/3/2017 9:38 AM, Mingyang Wang wrote: >> > Hi all, >> > >> > I was playing with a super sparse matrix FK, 2e7 by 1e6, with only one >> > non-zero value on each row, that is 2e7 non-zero values in total. >> > >> > With driver memory of 1GB and executor memory of 100GB, I found the HOP >> > "Spark chkpoint", which is used to pin the FK matrix in memory, is >> really >> > expensive, as it invokes lots of disk operations. >> > >> > FK is stored in binary format with 24 blocks, each block is ~45MB, and >> ~1GB >> > in total. >> > >> > For example, with the script as >> > >> > """ >> > FK = read($FK) >> > print("Sum of FK = " + sum(FK)) >> > """ >> > >> > things worked fine, and it took ~8s. >> > >> > While with the script as >> > >> > """ >> > FK =
Re: Sparse Matrix Storage Consumption Issue
Hi Matthias, Thanks for the patch. I have re-run the experiment and observed that there was indeed no more memory pressure, but it still took ~90s for this simple script. I was wondering what is the bottleneck for this case? Total elapsed time: 94.800 sec. Total compilation time: 1.826 sec. Total execution time: 92.974 sec. Number of compiled Spark inst: 2. Number of executed Spark inst: 2. Cache hits (Mem, WB, FS, HDFS): 1/0/0/0. Cache writes (WB, FS, HDFS): 0/0/0. Cache times (ACQr/m, RLS, EXP): 0.000/0.000/0.000/0.000 sec. HOP DAGs recompiled (PRED, SB): 0/0. HOP DAGs recompile time: 0.000 sec. Spark ctx create time (lazy): 0.860 sec. Spark trans counts (par,bc,col):0/0/0. Spark trans times (par,bc,col): 0.000/0.000/0.000 secs. Total JIT compile time: 3.498 sec. Total JVM GC count: 5. Total JVM GC time: 0.064 sec. Heavy hitter instructions (name, time, count): -- 1) sp_uak+ 92.597 sec 1 -- 2) sp_chkpoint 0.377 sec 1 -- 3) == 0.001 sec 1 -- 4) print 0.000 sec 1 -- 5) + 0.000 sec 1 -- 6) castdts 0.000 sec 1 -- 7) createvar 0.000 sec 3 -- 8) rmvar 0.000 sec 7 -- 9) assignvar 0.000 sec 1 -- 10) cpvar 0.000 sec 1 Regards, Mingyang On Wed, May 3, 2017 at 8:54 AM Matthias Boehmwrote: > to summarize, this was an issue of selecting serialized representations > for large ultra-sparse matrices. Thanks again for sharing your feedback > with us. > > 1) In-memory representation: In CSR every non-zero will require 12 bytes > - this is 240MB in your case. The overall memory consumption, however, > depends on the distribution of non-zeros: In CSR, each block with at > least one non-zero requires 4KB for row pointers. Assuming uniform > distribution (the worst case), this gives us 80GB. This is likely the > problem here. Every empty block would have an overhead of 44Bytes but > for the worst-case assumption, there are no empty blocks left. We do not > use COO for checkpoints because it would slow down subsequent operations. > > 2) Serialized/on-disk representation: For sparse datasets that are > expected to exceed aggregate memory, we used to use a serialized > representation (with storage level MEM_AND_DISK_SER) which uses sparse, > ultra-sparse, or empty representations. In this form, ultra-sparse > blocks require 9 + 16*nnz bytes and empty blocks require 9 bytes. > Therefore, with this representation selected, you're dataset should > easily fit in aggregate memory. Also, note that chkpoint is only a > transformation that persists the rdd, the subsequent operation then > pulls the data into memory. > > At a high-level this was a bug. We missed ultra-sparse representations > when introducing an improvement that stores sparse matrices in MCSR > format in CSR format on checkpoints which eliminated the need to use a > serialized storage level. I just deliver a fix. Now we store such > ultra-sparse matrices again in serialized form which should > significantly reduce the memory pressure. > > Regards, > Matthias > > On 5/3/2017 9:38 AM, Mingyang Wang wrote: > > Hi all, > > > > I was playing with a super sparse matrix FK, 2e7 by 1e6, with only one > > non-zero value on each row, that is 2e7 non-zero values in total. > > > > With driver memory of 1GB and executor memory of 100GB, I found the HOP > > "Spark chkpoint", which is used to pin the FK matrix in memory, is really > > expensive, as it invokes lots of disk operations. > > > > FK is stored in binary format with 24 blocks, each block is ~45MB, and > ~1GB > > in total. > > > > For example, with the script as > > > > """ > > FK = read($FK) > > print("Sum of FK = " + sum(FK)) > > """ > > > > things worked fine, and it took ~8s. > > > > While with the script as > > > > """ > > FK = read($FK) > > if (1 == 1) {} > > print("Sum of FK = " + sum(FK)) > > """ > > > > things changed. It took ~92s and I observed lots of disk spills from > logs. > > Based on the stats from Spark UI, it seems the materialized FK requires > >> 54GB storage and thus introduces disk operations. > > > > I was wondering, is this the expected behavior of a super sparse matrix? > > > > > > Regards, > > Mingyang > > >
Re: Sparse Matrix Storage Consumption Issue
to summarize, this was an issue of selecting serialized representations for large ultra-sparse matrices. Thanks again for sharing your feedback with us. 1) In-memory representation: In CSR every non-zero will require 12 bytes - this is 240MB in your case. The overall memory consumption, however, depends on the distribution of non-zeros: In CSR, each block with at least one non-zero requires 4KB for row pointers. Assuming uniform distribution (the worst case), this gives us 80GB. This is likely the problem here. Every empty block would have an overhead of 44Bytes but for the worst-case assumption, there are no empty blocks left. We do not use COO for checkpoints because it would slow down subsequent operations. 2) Serialized/on-disk representation: For sparse datasets that are expected to exceed aggregate memory, we used to use a serialized representation (with storage level MEM_AND_DISK_SER) which uses sparse, ultra-sparse, or empty representations. In this form, ultra-sparse blocks require 9 + 16*nnz bytes and empty blocks require 9 bytes. Therefore, with this representation selected, you're dataset should easily fit in aggregate memory. Also, note that chkpoint is only a transformation that persists the rdd, the subsequent operation then pulls the data into memory. At a high-level this was a bug. We missed ultra-sparse representations when introducing an improvement that stores sparse matrices in MCSR format in CSR format on checkpoints which eliminated the need to use a serialized storage level. I just deliver a fix. Now we store such ultra-sparse matrices again in serialized form which should significantly reduce the memory pressure. Regards, Matthias On 5/3/2017 9:38 AM, Mingyang Wang wrote: Hi all, I was playing with a super sparse matrix FK, 2e7 by 1e6, with only one non-zero value on each row, that is 2e7 non-zero values in total. With driver memory of 1GB and executor memory of 100GB, I found the HOP "Spark chkpoint", which is used to pin the FK matrix in memory, is really expensive, as it invokes lots of disk operations. FK is stored in binary format with 24 blocks, each block is ~45MB, and ~1GB in total. For example, with the script as """ FK = read($FK) print("Sum of FK = " + sum(FK)) """ things worked fine, and it took ~8s. While with the script as """ FK = read($FK) if (1 == 1) {} print("Sum of FK = " + sum(FK)) """ things changed. It took ~92s and I observed lots of disk spills from logs. Based on the stats from Spark UI, it seems the materialized FK requires 54GB storage and thus introduces disk operations. I was wondering, is this the expected behavior of a super sparse matrix? Regards, Mingyang