Thanks Pat, that's good to know!

This is the "reduce" step (which gets its own stage in my Spark jobs...this 
stage takes almost all the runtime) where most of the work is being done, and 
takes longer the more shuffle partitions there are (relative to # of CPUs):

https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b45863df3e53/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala#L258
 


Why does the runtime of this reduce stage (that ultimately calls 
SequentialAccessSparseVector.getQuick() and setQuick() a lot) depend on the 
ratio of (# Spark CPUs / spark.sql.shuffle.partitions)? Essentially that ratio 
determines how many "chunks" of shuffle partition (reduce) tasks must run, and 
each of those chunks always takes the same amount of time, so the stage 
finishes in less time when that ratio is low (preferably 1).

EXAMPLES - using 32 cores and 200 shuffle partitions, this stage requires 
ceil(145 tasks / 32 cores) = 5 "chunks" of work (145 tasks instead of 200 
because of the estimateProductPartitions call in AtA). Each chunk takes ~8 
minutes, so (5 chunks * 8 min) = ~40 mins. For a job with 32 cores and 32 
shuffle partitions (same CPU resources, still 100% utilized), this stage 
requires only ceil(23 tasks / 32 cores) = 1 chunk of work, which takes the same 
8 minutes, so the job finishes ~5x faster. You can take this to the extreme 
with just 1 core and 1 shuffle partition, and the stage still takes the same 
amount of time! I'd love to know if you can reproduce this behavior.

This goes against most advice and experience I've had with Spark, where you 
want to *increase* your partitioning in many cases (or at least leave it at the 
default 200, not lower it dramatically) to utilize CPUs better (and shrink each 
individual partition's task). There seems to be no reduction in computational 
complexity *per task* (within this stage I'm talking about) even with high 
values for spark.sql.shuffle.partitions (so it seems the data isn't actually 
being partitioned by the shuffle process). Refer back to the timings w/various 
configs in my first message.

Also...is there a possibility of using a faster hash-based implementation 
instead of the setQuick() / getQuick() methods of SequentialAccessSparseVector? 
The javadoc on those methods mentions they shouldn't be used unless absolutely 
necessary due to their O(log n) complexity.


Thanks for your time...this is fun stuff!
Matt



On 8/15/17, 10:15 AM, "Pat Ferrel" <p...@occamsmachete.com> wrote:

>Great, this is the best way to use the APIs. The big win with CCO, the algo 
>you are using is with multiple user actions. Be aware that when you go to this 
>methods the input IndexedDatasets must be coerced to have compatible 
>dimensionality, in this case the primary action defines the user-set used in 
>calculating the model—not the one for making queries, which can use anonymous 
>user  history. But that is for later and outside Mahout.
>
>1) 4x max parallelism is a rule of thumb since the cores may not need 100% 
>duty cycle, if they are already at 100% the 4x does no good. 2) you have found 
>a long running task but there will always be one, if it weren’t this one it 
>would be another. Different types of tasks use resources differently. For 
>instance the collects, which must eventually use a the memory of the Driver to 
>instantiate an in-memory data structure. There is no magic choice to make this 
>work differently but it avoid several joins, which are much slower.
>
>I’m not quite sure what your question is.
>
>
>On Aug 15, 2017, at 6:21 AM, Scruggs, Matt <matt.scru...@bronto.com> wrote:
>
>Hi Pat,
>
>I've taken some screenshots of my Spark UI to hopefully shed some light on the 
>behavior I'm seeing. Do you mind if I send you a link via direct email (would 
>rather not post it here)? It's just a shared Dropbox folder.
>
>
>Thanks,
>Matt
>
>
>
>On 8/14/17, 11:34 PM, "Scruggs, Matt" <matt.scru...@bronto.com> wrote:
>
>> I'm running a custom Scala app (distributed in a shaded jar) directly 
>> calling SimilarityAnalysis.cooccurrenceIDSs(), not using the CLI.
>> 
>> The input data already gets explicitly repartitioned to spark.cores.max 
>> (defaultParallelism) in our code. I'll try increasing that by the factor of 
>> 4 that you suggest, but all our cores are already utilized so I'm not sure 
>> that will help. It gets bogged down in the post-shuffle (shuffle read / 
>> combine / reduce) phase even with all cores busy the whole time, which is 
>> why I've been playing around with various values for 
>> spark.sql.shuffle.partitions. The O(log n) operations I mentioned seem to 
>> take >95% of runtime.
>> 
>> Thanks,
>> Matt
>> ________________________________
>> From: Pat Ferrel <p...@occamsmachete.com>
>> Sent: Monday, August 14, 2017 11:02:42 PM
>> To: user@mahout.apache.org
>> Subject: Re: spark-itemsimilarity scalability / Spark parallelism issues 
>> (SimilarityAnalysis.cooccurrencesIDSs)
>> 
>> Are you using the CLI? If so it’s likely that there is only one partition of 
>> the data. If you use Mahout in the Spark shell or using it as a lib, do a 
>> repartition on the input data before passing it into 
>> SimilarityAnalysis.cooccurrencesIDSs. I repartition to 4*total cores to 
>> start with and set max parallelism for spark to the same. The CLI isn’t 
>> really production worthy, just for super easy experiments with CSVs.
>> 
>> 
>> On Aug 14, 2017, at 2:31 PM, Scruggs, Matt <matt.scru...@bronto.com> wrote:
>> 
>> Howdy,
>> 
>> I'm running SimilarityAnalysis.cooccurrencesIDSs on a fairly small dataset 
>> (about 870k [user, item] rows in the primary action IDS…no cross 
>> co-occurrence IDS) and I noticed it scales strangely. This is with Mahout 
>> 0.13.0 although the same behavior happens in 0.12.x as well (haven't tested 
>> it before that).
>> 
>> TLDR - regardless of the Spark parallelism (CPUs) I throw at this routine, 
>> every Spark task within the final / busy stage seems to take the same amount 
>> of time, which leads me to guess that every shuffle partition contains the 
>> same amount of data (perhaps the full dataset matrix in shape/size, albeit 
>> with different values). I'm reaching out to see if this is a known 
>> algorithmic complexity issue in this routine, or if my config is to blame 
>> (or both).
>> 
>> Regarding our hardware, we have identical physical machines in a Mesos 
>> cluster with 6 workers and a few masters. Each worker has ~500GB of SSD, 32 
>> cores and 128g RAM. We run lots of Spark jobs and have generally ironed out 
>> the kinks in terms of hardware and cluster config, so I don't suspect any 
>> hardware-related issues.
>> 
>> Here are some timings for SimilarityAnalysis.cooccurrencesIDSs on this 
>> dataset with maxNumInteractions = 500, maxInterestingItemsPerThing = 20, 
>> randomSeed = default, parOpts = default (there's lots of other Spark config, 
>> this is just what I'm varying to check for effects). In particular, notice 
>> how the ratio of (spark.sql.shuffle.partitions / spark.cores.max) affects 
>> the runtime:
>> 
>> * 8 executors w/8 cores each, takes about 45 minutes
>> * note that spark.sql.shuffle.partitions > spark.cores.max
>> spark.cores.max = 64
>> spark.executor.cores = 8
>> spark.sql.shuffle.partitions = 200 (default)
>> 
>> * 1 executors w/24 cores, takes about 65 minutes
>> * note that spark.sql.shuffle.partitions >>> spark.cores.max
>> spark.cores.max = 24
>> spark.executor.cores = 24
>> spark.sql.shuffle.partitions = 200 (default)
>> 
>> * 1 executor w/8 cores, takes about 8 minutes
>> * note that spark.sql.shuffle.partitions = spark.cores.max
>> spark.cores.max = 8
>> spark.executor.cores = 8 (1 executor w/8 cores)
>> spark.sql.shuffle.partitions = 8
>> 
>> * 1 executor w/24 cores, takes about 8 minutes (same as 8 cores!)
>> * note that spark.sql.shuffle.partitions = spark.cores.max
>> spark.cores.max = 24
>> spark.executor.cores = 24 (1 executor w/24 cores)
>> spark.sql.shuffle.partitions = 24
>> 
>> * 32 executors w/2 cores each, takes about 8 minutes (same as 8 cores!)
>> * note that spark.sql.shuffle.partitions = spark.cores.max
>> spark.cores.max = 64
>> spark.executor.cores = 2
>> spark.sql.shuffle.partitions = 88 (results in 64 tasks for final stage)
>> 
>> Adjusting the "maxNumInteractions" parameter down to 100 and 50 results in a 
>> minor improvement (5-10%). I've also played around with removing [user, 
>> item] rows from the input dataset for users with only 1 interaction…I read 
>> to try that in another thread…that yielded maybe a 40-50% speed improvement, 
>> but I'd rather not toss out data (unless it truly is totally useless, of 
>> course :D ).
>> 
>> When I look at the thread dump within the Spark UI's Executors -> thread 
>> dump pages, it seems all the executors are very busy in the code pasted 
>> below for >95% of the run. GC throughput is very good so we're not bogged 
>> down there...it's just super busy doing running the code below. I am 
>> intrigued about the comments on the SequentialAccessSparseVector methods I 
>> see being called (getQuick and setQuick), which state they take O(log n) 
>> time 
>> (https://github.com/apache/mahout/blob/08e02602e947ff945b9bd73ab5f0b45863df3e53/math/src/main/java/org/apache/mahout/math/SequentialAccessSparseVector.java).
>> 
>> 
>> Thanks all for your time and feedback!
>> 
>> Matt Scruggs
>> 
>> org.apache.mahout.math.OrderedIntDoubleMapping.find(OrderedIntDoubleMapping.java:105)
>> org.apache.mahout.math.OrderedIntDoubleMapping.get(OrderedIntDoubleMapping.java:110)
>> org.apache.mahout.math.SequentialAccessSparseVector.getQuick(SequentialAccessSparseVector.java:157)
>> org.apache.mahout.math.SparseRowMatrix.getQuick(SparseRowMatrix.java:90)
>> org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240)
>> org.apache.mahout.math.scalabindings.MatrixOps.$plus$eq(MatrixOps.scala:45)
>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:163)
>> org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>> org.apache.spark.scheduler.Task.run(Task.scala:86)
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> java.lang.Thread.run(Thread.java:745)
>> 
>> ……or this code……
>> 
>> org.apache.mahout.math.SparseRowMatrix.setQuick(SparseRowMatrix.java:105)
>> org.apache.mahout.math.AbstractMatrix.assign(AbstractMatrix.java:240)
>> org.apache.mahout.math.scalabindings.MatrixOps.$plus$eq(MatrixOps.scala:45)
>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
>> org.apache.mahout.sparkbindings.blas.AtA$$anonfun$19.apply(AtA.scala:258)
>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:151)
>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$3.apply(ExternalAppendOnlyMap.scala:150)
>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:163)
>> org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)
>> org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)
>> org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>> org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>> org.apache.spark.scheduler.Task.run(Task.scala:86)
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> java.lang.Thread.run(Thread.java:745)
>> 
>

Reply via email to