Hi, I have a 40G file which is a concatenation of multiple documents, I want to extract two features (title and tables) from each doc, so the program is like this:
------------------------------------------------------------- val file = sc.textFile("/path/to/40G/file") //file.cache() //to enable or disable cache val titles = file.map(line => (doc_key, getTitle()) // reduce 1; here I use text utility functions written in Java { }).reduceByKey(_ + _,1) val tables = file.flatMap(line => { for (table <- all_tables) yield (doc_key, getTableTitle()) // reduce 2; here I use text utility functions written in Java }).reduceByKey(_ + _,1) titles.saveAsTextFile("titles.out") //save_1, will trigger reduce_1 tables.saveAsTextFile("tables.out") //save_2, will trigger reduce_2 ------------------------------------------------------------- I expect that with file.cache(), (the later) reduce_2 should be faster since it will read from cached data. However, results repeatedly shows that, reduce_2 takes 3 min when with cache and 1.4 min without cache. Why reading from cache does not help in this case? Stage GUI shows that, with cache, reduce_2 always has a wave of "outlier tasks", where the median latency is 2s but max is 1.7 min. Metric Min 25th percentile Median 75th percentile Max Result serialization time 0 ms 0 ms 0 ms 0 ms 1 ms Duration 0.6 s 2 s 2 s 2 s 1.7 min But these tasks are not with a long GC pause (26 ms as shown) 173 1210 SUCCESS PROCESS_LOCAL localhost 2014/06/17 17:49:43 1.7 min 26 ms 9.4 KB BTW: it is a single machine with 32 cores, 192 GB RAM, SSD, with these lines in spark-env.sh SPARK_WORKER_MEMORY=180g SPARK_MEM=180g SPARK_JAVA_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:MaxPermSize=256m" Thanks, Wei --------------------------------- Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan