Re: parallelize for a large Seq is extreamly slow.
That's not work. I don't think it is just slow, It never ends(with 30+ hours, and I killed it). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4900.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
It's my fault! I upload a wrong jar when I changed the number of partitions. and Now it just works fine:) The size of word_mapping is 2444185. So it will take very long time for large object serialization? I don't think two million is very large, because the cost at local for such size is typically less than one second. Thanks for the help:) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4914.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
reduceByKey(_+_).countByKey instead of countByKey seems to be fast. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4870.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
parallelize is still so slow. package com.semi.nlp import org.apache.spark._ import SparkContext._ import scala.io.Source import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[Map[String,Int]]) kryo.register(classOf[Map[String,Long]]) kryo.register(classOf[Seq[(String,Long)]]) kryo.register(classOf[Seq[(String,Int)]]) } } object WFilter2 { def initspark(name:String) = { val conf = new SparkConf() .setMaster(yarn-standalone) .setAppName(name) .setSparkHome(System.getenv(SPARK_HOME)) .setJars(SparkContext.jarOfClass(this.getClass)) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) //.set(spark.closure.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, 256) .set(spark.kryo.registrator, com.semi.nlp.MyRegistrator) .set(spark.cores.max, 30) new SparkContext(conf) } def main(args: Array[String]) { val spark = initspark(word filter mapping) val stopset = spark broadcast Source.fromURL(this.getClass.getResource(/stoplist.txt)).getLines.map(_.trim).toSet val file = spark.textFile(hdfs://ns1/nlp/wiki.splited) val tf_map = spark broadcast file.flatMap(_.split(\t)).map((_,1)).reduceByKey(_+_).countByKey val df_map = spark broadcast file.flatMap(x=Set(x.split(\t):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*) def w_filter(w:String) = if (tf_map.value(w) 8 || df_map.value(w) 4 || (stopset.value contains w)) false else true val mapped = file.map(_.split(\t).filter(w_filter).map(w=word_mapping.value(w)).mkString(\t)) spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping) mapped.saveAsTextFile(hdfs://ns1/nlp/lda/wiki.docs) spark.stop() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
Could it be that you're using the default number of partitions of parallelize() is too small in this case? Try something like spark.parallelize(word_mapping.value.toSeq, 60). (Given your setup, it should already be 30, but perhaps that's not the case in YARN mode...) On Fri, Apr 25, 2014 at 11:38 PM, Earthson earthson...@gmail.com wrote: parallelize is still so slow. package com.semi.nlp import org.apache.spark._ import SparkContext._ import scala.io.Source import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[Map[String,Int]]) kryo.register(classOf[Map[String,Long]]) kryo.register(classOf[Seq[(String,Long)]]) kryo.register(classOf[Seq[(String,Int)]]) } } object WFilter2 { def initspark(name:String) = { val conf = new SparkConf() .setMaster(yarn-standalone) .setAppName(name) .setSparkHome(System.getenv(SPARK_HOME)) .setJars(SparkContext.jarOfClass(this.getClass)) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) //.set(spark.closure.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, 256) .set(spark.kryo.registrator, com.semi.nlp.MyRegistrator) .set(spark.cores.max, 30) new SparkContext(conf) } def main(args: Array[String]) { val spark = initspark(word filter mapping) val stopset = spark broadcast Source.fromURL(this.getClass.getResource(/stoplist.txt)).getLines.map(_.trim).toSet val file = spark.textFile(hdfs://ns1/nlp/wiki.splited) val tf_map = spark broadcast file.flatMap(_.split(\t)).map((_,1)).reduceByKey(_+_).countByKey val df_map = spark broadcast file.flatMap(x=Set(x.split(\t):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*) def w_filter(w:String) = if (tf_map.value(w) 8 || df_map.value(w) 4 || (stopset.value contains w)) false else true val mapped = file.map(_.split(\t).filter(w_filter).map(w=word_mapping.value(w)).mkString(\t)) spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping) mapped.saveAsTextFile(hdfs://ns1/nlp/lda/wiki.docs) spark.stop() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: parallelize for a large Seq is extreamly slow.
) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:91) ... 10 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4868.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
parallelize for a large Seq is extreamly slow.
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping) this line is too slow. There are about 2 million elements in word_mapping. *Is there a good style for writing a large collection to hdfs?* import org.apache.spark._ import SparkContext._ import scala.io.Source object WFilter { def main(args: Array[String]) { val spark = new SparkContext(yarn-standalone,word filter,System.getenv(SPARK_HOME), SparkContext.jarOfClass(this.getClass)) val stopset = Source.fromURL(this.getClass.getResource(stoplist.txt)).getLines.map(_.trim).toSet val file = spark.textFile(hdfs://ns1/nlp/wiki.splited) val tf_map = spark broadcast file.flatMap(_.split(\t)).map((_,1)).countByKey val df_map = spark broadcast file.map(x=Set(x.split(\t):_*)).flatMap(_.map(_-1)).countByKey val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*) def w_filter(w:String) = if (tf_map.value(w) 8 || df_map.value(w) 4 || (stopset contains w)) false else true val mapped = file.map(_.split(\t).filter(w_filter).map(w=word_mapping.value(w)).mkString(\t)) spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping) mapped.saveAsTextFile(hdfs://ns1/nlp/lda/wiki.docs) spark.stop() } } many thx:) -- ~ Perfection is achieved not when there is nothing more to add but when there is nothing left to take away
Re: parallelize for a large Seq is extreamly slow.
Try setting the serializer to org.apache.spark.serializer.KryoSerializer (see http://spark.apache.org/docs/0.9.1/tuning.html), it should be considerably faster. Matei On Apr 24, 2014, at 8:01 PM, Earthson Lu earthson...@gmail.com wrote: spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping) this line is too slow. There are about 2 million elements in word_mapping. Is there a good style for writing a large collection to hdfs? import org.apache.spark._ import SparkContext._ import scala.io.Source object WFilter { def main(args: Array[String]) { val spark = new SparkContext(yarn-standalone,word filter,System.getenv(SPARK_HOME), SparkContext.jarOfClass(this.getClass)) val stopset = Source.fromURL(this.getClass.getResource(stoplist.txt)).getLines.map(_.trim).toSet val file = spark.textFile(hdfs://ns1/nlp/wiki.splited) val tf_map = spark broadcast file.flatMap(_.split(\t)).map((_,1)).countByKey val df_map = spark broadcast file.map(x=Set(x.split(\t):_*)).flatMap(_.map(_-1)).countByKey val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*) def w_filter(w:String) = if (tf_map.value(w) 8 || df_map.value(w) 4 || (stopset contains w)) false else true val mapped = file.map(_.split(\t).filter(w_filter).map(w=word_mapping.value(w)).mkString(\t)) spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping) mapped.saveAsTextFile(hdfs://ns1/nlp/lda/wiki.docs) spark.stop() } } many thx:) -- ~ Perfection is achieved not when there is nothing more to add but when there is nothing left to take away
Re: parallelize for a large Seq is extreamly slow.
Kryo With Exception below: com.esotericsoftware.kryo.KryoException (com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0, required: 1) com.esotericsoftware.kryo.io.Output.require(Output.java:138) com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446) com.esotericsoftware.kryo.io.Output.writeString(Output.java:306) com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79) com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:21) com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:19) com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:124) org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223) org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) ~~~ package com.semi.nlp import org.apache.spark._ import SparkContext._ import scala.io.Source import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[Map[String,Int]]) } } object WFilter2 { def initspark(name:String) = { val conf = new SparkConf() .setMaster(yarn-standalone) .setAppName(name) .setSparkHome(System.getenv(SPARK_HOME)) .setJars(SparkContext.jarOfClass(this.getClass)) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, com.semi.nlp.MyRegistrator) new SparkContext(conf) } def main(args: Array[String]) { val spark = initspark(word filter mapping) val stopset = Source.fromURL(this.getClass.getResource(/stoplist.txt)).getLines.map(_.trim).toSet val file = spark.textFile(hdfs://ns1/nlp/wiki.splited) val tf_map = spark broadcast file.flatMap(_.split(\t)).map((_,1)).countByKey val df_map = spark broadcast file.map(x=Set(x.split(\t):_*)).flatMap(_.map(_-1)).countByKey val word_mapping = spark broadcast Map(df_map.value.keys.zipWithIndex.toBuffer:_*) def w_filter(w:String) = if (tf_map.value(w) 8 || df_map.value(w) 4 || (stopset contains w)) false else true val mapped = file.map(_.split(\t).filter(w_filter).map(w=word_mapping.value(w)).mkString(\t)) spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping) mapped.saveAsTextFile(hdfs://ns1/nlp/lda/wiki.docs) spark.stop() } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4809.html Sent from the Apache Spark User List mailing list archive at Nabble.com.