Could it be that you're using the default number of partitions of
parallelize() is too small in this case? Try something like
spark.parallelize(word_mapping.value.toSeq, 60). (Given your setup, it
should already be 30, but perhaps that's not the case in YARN mode...)


On Fri, Apr 25, 2014 at 11:38 PM, Earthson <earthson...@gmail.com> wrote:

> parallelize is still so slow.
>
> ~~~~~~~~
>
> package com.semi.nlp
>
> import org.apache.spark._
> import SparkContext._
> import scala.io.Source
> import com.esotericsoftware.kryo.Kryo
> import org.apache.spark.serializer.KryoRegistrator
>
> class MyRegistrator extends KryoRegistrator {
>     override def registerClasses(kryo: Kryo) {
>         kryo.register(classOf[Map[String,Int]])
>         kryo.register(classOf[Map[String,Long]])
>         kryo.register(classOf[Seq[(String,Long)]])
>         kryo.register(classOf[Seq[(String,Int)]])
>     }
> }
>
> object WFilter2 {
>     def initspark(name:String) = {
>         val conf = new SparkConf()
>                     .setMaster("yarn-standalone")
>                     .setAppName(name)
>                     .setSparkHome(System.getenv("SPARK_HOME"))
>                     .setJars(SparkContext.jarOfClass(this.getClass))
>                     .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>                     //.set("spark.closure.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>                     .set("spark.kryoserializer.buffer.mb", "256")
>                     .set("spark.kryo.registrator",
> "com.semi.nlp.MyRegistrator")
>                     .set("spark.cores.max", "30")
>         new SparkContext(conf)
>     }
>
>     def main(args: Array[String]) {
>         val spark = initspark("word filter mapping")
>         val stopset = spark broadcast
>
> Source.fromURL(this.getClass.getResource("/stoplist.txt")).getLines.map(_.trim).toSet
>         val file = spark.textFile("hdfs://ns1/nlp/wiki.splited")
>         val tf_map = spark broadcast
> file.flatMap(_.split("\t")).map((_,1)).reduceByKey(_+_).countByKey
>         val df_map = spark broadcast
>
> file.flatMap(x=>Set(x.split("\t"):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey
>         val word_mapping = spark broadcast
> Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
>         def w_filter(w:String) = if (tf_map.value(w) < 8 || df_map.value(w)
> < 4 || (stopset.value contains w)) false else true
>         val mapped =
>
> file.map(_.split("\t").filter(w_filter).map(w=>word_mapping.value(w)).mkString("\t"))
>
>
> spark.parallelize(word_mapping.value.toSeq).saveAsTextFile("hdfs://ns1/nlp/word_mapping")
>         mapped.saveAsTextFile("hdfs://ns1/nlp/lda/wiki.docs")
>         spark.stop()
>     }
> }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to