Re: parallelize for a large Seq is extreamly slow.

2014-04-27 Thread Earthson
That's not work. I don't think it is just slow, It never ends(with 30+ hours,
and I killed it). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4900.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-27 Thread Earthson
It's my fault! I upload a wrong jar when I changed the number of partitions.
and Now it just works fine:)

The size of word_mapping is 2444185.

So it will take very long time for large object serialization? I don't think
two million is very large, because the cost at local for such size is
typically less than one second. 

Thanks for the help:)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4914.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-26 Thread Earthson
reduceByKey(_+_).countByKey instead of countByKey seems to be fast.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4870.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-26 Thread Earthson
parallelize is still so slow. 



package com.semi.nlp

import org.apache.spark._
import SparkContext._
import scala.io.Source
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Map[String,Int]])
kryo.register(classOf[Map[String,Long]])
kryo.register(classOf[Seq[(String,Long)]])
kryo.register(classOf[Seq[(String,Int)]])
}
}

object WFilter2 {
def initspark(name:String) = {
val conf = new SparkConf()
.setMaster(yarn-standalone)
.setAppName(name)
.setSparkHome(System.getenv(SPARK_HOME))
.setJars(SparkContext.jarOfClass(this.getClass))
.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
//.set(spark.closure.serializer,
org.apache.spark.serializer.KryoSerializer)
.set(spark.kryoserializer.buffer.mb, 256)
.set(spark.kryo.registrator,
com.semi.nlp.MyRegistrator)
.set(spark.cores.max, 30)
new SparkContext(conf)
}

def main(args: Array[String]) {
val spark = initspark(word filter mapping)
val stopset = spark broadcast
Source.fromURL(this.getClass.getResource(/stoplist.txt)).getLines.map(_.trim).toSet
val file = spark.textFile(hdfs://ns1/nlp/wiki.splited)
val tf_map = spark broadcast
file.flatMap(_.split(\t)).map((_,1)).reduceByKey(_+_).countByKey
val df_map = spark broadcast
file.flatMap(x=Set(x.split(\t):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey
val word_mapping = spark broadcast
Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
def w_filter(w:String) = if (tf_map.value(w)  8 || df_map.value(w)
 4 || (stopset.value contains w)) false else true
val mapped =
file.map(_.split(\t).filter(w_filter).map(w=word_mapping.value(w)).mkString(\t))
   
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping)
mapped.saveAsTextFile(hdfs://ns1/nlp/lda/wiki.docs)
spark.stop()
}
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-26 Thread Aaron Davidson
Could it be that you're using the default number of partitions of
parallelize() is too small in this case? Try something like
spark.parallelize(word_mapping.value.toSeq, 60). (Given your setup, it
should already be 30, but perhaps that's not the case in YARN mode...)


On Fri, Apr 25, 2014 at 11:38 PM, Earthson earthson...@gmail.com wrote:

 parallelize is still so slow.

 

 package com.semi.nlp

 import org.apache.spark._
 import SparkContext._
 import scala.io.Source
 import com.esotericsoftware.kryo.Kryo
 import org.apache.spark.serializer.KryoRegistrator

 class MyRegistrator extends KryoRegistrator {
 override def registerClasses(kryo: Kryo) {
 kryo.register(classOf[Map[String,Int]])
 kryo.register(classOf[Map[String,Long]])
 kryo.register(classOf[Seq[(String,Long)]])
 kryo.register(classOf[Seq[(String,Int)]])
 }
 }

 object WFilter2 {
 def initspark(name:String) = {
 val conf = new SparkConf()
 .setMaster(yarn-standalone)
 .setAppName(name)
 .setSparkHome(System.getenv(SPARK_HOME))
 .setJars(SparkContext.jarOfClass(this.getClass))
 .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 //.set(spark.closure.serializer,
 org.apache.spark.serializer.KryoSerializer)
 .set(spark.kryoserializer.buffer.mb, 256)
 .set(spark.kryo.registrator,
 com.semi.nlp.MyRegistrator)
 .set(spark.cores.max, 30)
 new SparkContext(conf)
 }

 def main(args: Array[String]) {
 val spark = initspark(word filter mapping)
 val stopset = spark broadcast

 Source.fromURL(this.getClass.getResource(/stoplist.txt)).getLines.map(_.trim).toSet
 val file = spark.textFile(hdfs://ns1/nlp/wiki.splited)
 val tf_map = spark broadcast
 file.flatMap(_.split(\t)).map((_,1)).reduceByKey(_+_).countByKey
 val df_map = spark broadcast

 file.flatMap(x=Set(x.split(\t):_*).toBuffer).map((_,1)).reduceByKey(_+_).countByKey
 val word_mapping = spark broadcast
 Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
 def w_filter(w:String) = if (tf_map.value(w)  8 || df_map.value(w)
  4 || (stopset.value contains w)) false else true
 val mapped =

 file.map(_.split(\t).filter(w_filter).map(w=word_mapping.value(w)).mkString(\t))


 spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping)
 mapped.saveAsTextFile(hdfs://ns1/nlp/lda/wiki.docs)
 spark.stop()
 }
 }



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4871.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: parallelize for a large Seq is extreamly slow.

2014-04-25 Thread Earthson
)
at
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:91)
... 10 more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4868.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


parallelize for a large Seq is extreamly slow.

2014-04-24 Thread Earthson Lu
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping)

this line is too slow. There are about 2 million elements in word_mapping.

*Is there a good style for writing a large collection to hdfs?*

import org.apache.spark._
 import SparkContext._
 import scala.io.Source
 object WFilter {
 def main(args: Array[String]) {
 val spark = new SparkContext(yarn-standalone,word
 filter,System.getenv(SPARK_HOME), SparkContext.jarOfClass(this.getClass))
 val stopset =
 Source.fromURL(this.getClass.getResource(stoplist.txt)).getLines.map(_.trim).toSet
 val file = spark.textFile(hdfs://ns1/nlp/wiki.splited)
 val tf_map = spark broadcast
 file.flatMap(_.split(\t)).map((_,1)).countByKey
 val df_map = spark broadcast
 file.map(x=Set(x.split(\t):_*)).flatMap(_.map(_-1)).countByKey
 val word_mapping = spark broadcast
 Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
 def w_filter(w:String) = if (tf_map.value(w)  8 ||
 df_map.value(w)  4 || (stopset contains w)) false else true
 val mapped =
 file.map(_.split(\t).filter(w_filter).map(w=word_mapping.value(w)).mkString(\t))

 spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping)
 mapped.saveAsTextFile(hdfs://ns1/nlp/lda/wiki.docs)
 spark.stop()
 }
 }


many thx:)

-- 

~
Perfection is achieved
not when there is nothing more to add
 but when there is nothing left to take away


Re: parallelize for a large Seq is extreamly slow.

2014-04-24 Thread Matei Zaharia
Try setting the serializer to org.apache.spark.serializer.KryoSerializer (see 
http://spark.apache.org/docs/0.9.1/tuning.html), it should be considerably 
faster.

Matei

On Apr 24, 2014, at 8:01 PM, Earthson Lu earthson...@gmail.com wrote:

 spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping)
 
 this line is too slow. There are about 2 million elements in word_mapping.
 
 Is there a good style for writing a large collection to hdfs?
 
 import org.apache.spark._
 import SparkContext._
 import scala.io.Source
 object WFilter {
 def main(args: Array[String]) {
 val spark = new SparkContext(yarn-standalone,word 
 filter,System.getenv(SPARK_HOME), SparkContext.jarOfClass(this.getClass))
 val stopset = 
 Source.fromURL(this.getClass.getResource(stoplist.txt)).getLines.map(_.trim).toSet
 val file = spark.textFile(hdfs://ns1/nlp/wiki.splited)
 val tf_map = spark broadcast 
 file.flatMap(_.split(\t)).map((_,1)).countByKey
 val df_map = spark broadcast 
 file.map(x=Set(x.split(\t):_*)).flatMap(_.map(_-1)).countByKey
 val word_mapping = spark broadcast 
 Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
 def w_filter(w:String) = if (tf_map.value(w)  8 || df_map.value(w)  
 4 || (stopset contains w)) false else true
 val mapped = 
 file.map(_.split(\t).filter(w_filter).map(w=word_mapping.value(w)).mkString(\t))
 
 spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping)
 mapped.saveAsTextFile(hdfs://ns1/nlp/lda/wiki.docs)
 spark.stop()
 }
 } 
 
 many thx:) 
 
 -- 
 
 ~
 Perfection is achieved 
 not when there is nothing more to add
  but when there is nothing left to take away



Re: parallelize for a large Seq is extreamly slow.

2014-04-24 Thread Earthson
Kryo With Exception below:

com.esotericsoftware.kryo.KryoException
(com.esotericsoftware.kryo.KryoException: Buffer overflow. Available: 0,
required: 1)
com.esotericsoftware.kryo.io.Output.require(Output.java:138)
com.esotericsoftware.kryo.io.Output.writeAscii_slow(Output.java:446)
com.esotericsoftware.kryo.io.Output.writeString(Output.java:306)
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153)
com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79)
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:21)
com.twitter.chill.SomeSerializer.write(SomeSerializer.scala:19)
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:124)
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:223)
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)

~~~

package com.semi.nlp

import org.apache.spark._
import SparkContext._
import scala.io.Source
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[Map[String,Int]])
}
}

object WFilter2 {
def initspark(name:String) = {
val conf = new SparkConf()
.setMaster(yarn-standalone)
.setAppName(name)
.setSparkHome(System.getenv(SPARK_HOME))
.setJars(SparkContext.jarOfClass(this.getClass))
.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
.set(spark.kryo.registrator,
com.semi.nlp.MyRegistrator)
new SparkContext(conf)
}

def main(args: Array[String]) {
val spark = initspark(word filter mapping)
val stopset =
Source.fromURL(this.getClass.getResource(/stoplist.txt)).getLines.map(_.trim).toSet
val file = spark.textFile(hdfs://ns1/nlp/wiki.splited)
val tf_map = spark broadcast
file.flatMap(_.split(\t)).map((_,1)).countByKey
val df_map = spark broadcast
file.map(x=Set(x.split(\t):_*)).flatMap(_.map(_-1)).countByKey
val word_mapping = spark broadcast
Map(df_map.value.keys.zipWithIndex.toBuffer:_*)
def w_filter(w:String) = if (tf_map.value(w)  8 || df_map.value(w)
 4 || (stopset contains w)) false else true
val mapped =
file.map(_.split(\t).filter(w_filter).map(w=word_mapping.value(w)).mkString(\t))
   
spark.parallelize(word_mapping.value.toSeq).saveAsTextFile(hdfs://ns1/nlp/word_mapping)
mapped.saveAsTextFile(hdfs://ns1/nlp/lda/wiki.docs)
spark.stop()
}
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4809.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.