On Thu, Mar 19, 2015 at 5:16 AM, sergunok <ser...@gmail.com> wrote:
> Hi,
>
> I try to vectorize on yarn cluster corpus of texts (about 500K texts in 13
> files - 100GB totally) located in HDFS .
>
> This process already token about 20 hours on 3 node cluster with 6 cores,
> 20GB RAM on each node.

> In my opinion it's to long :-)
>
> I started the task with the following command:
> spark-submit --master yarn --num-executors 9 --executor-memory 5GB
> --excutor-cores=2 --driver-memory 5GB weight.py

only 5G * 2 * 6 * 0.6 = 36G will be used for cache, it's less than
100G (need more than that).
so you can not have any benefit from cache(), just remove it.

> weight.py:
> from pyspark import SparkConf, SparkContext
> from pyspark.mllib.feature import HashingTF
> from pyspark.mllib.feature import IDF
> from pyspark.mllib.feature import Normalizer
>
> conf = SparkConf() \
>         .set("spark.hadoop.validateOutputSpecs", "false") \
>         .set("spark.yarn.executor.memoryOverhead", "900")
> sc = SparkContext(conf=conf)
>
>
> # reading files from directory 'in/texts.txt' in HDFS
> texts=sc.textFile('in/texts.txt') \
> .map(lambda line: line.split())
>
> hashingTF = HashingTF()

By default, you will have 2 millions features, do you really need
that? Maybe 10k is enough for English.

> tf = hashingTF.transform(texts)
>
> tf.cache()

remove the cache

> idf = IDF(minDocFreq=100).fit(tf)
> tfidf = idf.transform(tf)
>
> n=Normalizer()
>
> normalized=n.transform(tfidf)
>
> def x2((vec, num)):
>     triples=[]
>     for id, weight in zip(vec.indices, vec.values):
>             triples.append((num, id, weight))
>     return triples
>
> # I use zipWithIndex to enumerate documents
> normalized.zipWithIndex() \
> .flatMap(x2) \
> .map(lambda t: '{}\t{}\t{}'.format(t[0],t[1],t[2])) \
> .saveAsTextFile('out/weights.txt')

zipWithIndex() is expensive, it will trigger a job, you could use
zipWIthUniqueID() (then id is not continuous)

> 1) What could be a bottleneck?
> Unfortunately I don't have access to the web UI.
> In the log file I see stages: 0,1,2,3
> Stage 0 "MapPartitionsRDD[6] at mapPartitionsWithIndex at
> RDDFunctions.scala:108" with 584 tasks completed very quick
> Stage 1 "MappedRDD[8] at values at RDDFunctions.scala:110" (23 tasks) -
> quick too
> Stage 2 "zipWithIndex" (584 tasks) was long (17 hours)
> Stage 3 "saveAsTextFile" (584 tasks) - too (still executing about 2 hours)
>
> I don't understand bounds of Stages 0,1..
> And don't understand why I I see numbers like 584 or 23 tasks on stages.
>
>
> 2) On previous start of this task I saw a lot of "executor lost" errors of
> yarn scheduler. Later I added .set("spark.yarn.executor.memoryOverhead",
> "900") setting in code and now I see only a few such messages.  Could it be
> a reason of poor performance?
>
> Please advise!
>
> Any explainations appreciated!
>
> Serg.
>
>
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/calculating-TF-IDF-for-large-100GB-dataset-problems-tp22144.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to