Hello,

When you set the Spark config as below do you still get one task?

Unfortunately yes.

Currently I am looking for the very first shuffle stage in SimilarityAnalysis#rowSimilarity but cannot find it. There is a lot of mapping, wrapping and caching during SimilarityAnalysis#sampleDownAndBinarizeand I don't get where to look for the code of "%*%" in:

// Compute row similarity cooccurrence matrix AA'
val drmAAt = drmA %*% drmA.t

I would like to hard code partition number in that first shuffle just for the sake of experiment.

On 13.10.2014 18:29, Pat Ferrel wrote:
I see no place where the spark.default.parallelism is set so your config can be set it to 
whatever you wish. When you set the Spark config as below do you still get one task? The 
test suite sets the spark.default.parallelism to 10 before the context is initialized. To 
do this with the SimilarityAnalysis.rowSimilarity (here I assume you are modifying the 
driver) put the  .set("spark.default.parallelism", 400) in 
RowSimilarityDriver.start and see if that changes things.

If this doesn’t work it may be that the blas optimizer is doing something with 
the value but I’m lost in that code There is only one place the value is read, 
which is in Par.scala

         // auto adjustment, try to scale up to either x1Size or x2Size.
         val clusterSize = rdd.context.getConf.get("spark.default.parallelism", 
"1").toInt

         val x1Size = (clusterSize * .95).ceil.toInt
         val x2Size = (clusterSize * 1.9).ceil.toInt

         if (rdd.partitions.size <= x1Size)
           rdd.coalesce(numPartitions = x1Size, shuffle = true)
         else if (rdd.partitions.size <= x2Size)
           rdd.coalesce(numPartitions = x2Size, shuffle = true)
         else
           rdd.coalesce(numPartitions = rdd.partitions.size)


Dmitriy can you shed any light on the use of spark.default.parallelism, how to 
increase it or how to get more than one task created when performing ABt?


On Oct 13, 2014, at 8:56 AM, Reinis Vicups <[email protected]> wrote:

Hi,

I am currently testing SimilarityAnalysis.rowSimilarity and I am wondering, how 
could I increase number of tasks to use for distributed shuffle.

What I currently observe, is that SimilarityAnalysis is requiring almost 20 
minutes for my dataset only with this stage:

combineByKey at ABt.scala:126

When I view details for the stage I see that only one task is spawned running 
on one node.

I have my own implementation of SimilarityAnalysis and by tuning number of 
tasks I have reached HUGE performance gains.

Since I couldn't find how to pass the number of tasks to shuffle operations 
directly, I have set following in spark config

configuration = new SparkConf().setAppName(jobConfig.jobName)
        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .set("spark.kryo.registrator", 
"org.apache.mahout.sparkbindings.io.MahoutKryoRegistrator")
        .set("spark.kryo.referenceTracking", "false")
        .set("spark.kryoserializer.buffer.mb", "200")
        .set("spark.default.parallelism", 400) // <- this is the line supposed 
to set default parallelism to some high number

Thank you for your help
reinis


Reply via email to