Why executor encourage OutOfMemoryException: Java heap space
Hi all, sometimes you can see OutOfMemoryException: Java heap space of executor in Spark. There many ideas about how to work arounds. My question is: how does executor execute tasks from the point of view of memory usage and parallelism? Picture in my mind is: Executor is JVM instance. Number of parallel tasks which can be executed in parallel threads inside single executor are contolled by --executor-cores param of submit-job in case of YARN. Each executor owns --executor-memory memory which is diveded in memory for RDD cache and memory for task execution. I don't consider caching topic now. It is very interesting to me how memory for task execution is used while work of executor. Let's consider an example when you have only map operations, no joins / group/ reduce and no caching. sc.textFile('test.txt') \ .map(lambda line: line.split()) \ .map(lambda item: int(item) + 10) \ .saveAsTextFile('out.txt') How the input RDD will be processed in this case? I know RDDs are divided in P partitions by some rules (for example by block size of HDFS). So we will have P partitions, P tasks and 1 stage (Am I right?). Let --executor-cores be 2. In this case executor will process two partitions in parallel. Will it try to load entire partitions in memory? Or will just call map chaines for each element of partitions? What can encourage OutOfMemoryException: Java heap space in this case?Large size of partition or large amount of memory to be eated by processing of single element of RDD? Please correct me and advise. Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-executor-encourage-OutOfMemoryException-Java-heap-space-tp22238.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Which RDD operations preserve ordering?
Hi guys, I don't have exact picture about preserving of ordering of elements of RDD after executing of operations. Which operations preserve it? 1) Map (Yes?) 2) ZipWithIndex (Yes or sometimes yes?) Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Which-RDD-operations-preserve-ordering-tp22239.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark UI tunneling
Is it a way to tunnel Spark UI? I tried to tunnel client-node:4040 but my browser was redirected from localhost to some cluster locally visible domain name.. Maybe there is some startup option to encourage Spark UI be fully accessiable just through single endpoint (address:port)? Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-tunneling-tp22184.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
log files of failed task
Hi, I executed a task on Spark in YARN and it failed. I see just executor lost message from YARNClientScheduler, no further details.. (I read ths error can be connected to spark.yarn.executor.memoryOverhead setting and already played with this param) How to go more deeply in details in log files and find exact reason? How can log of failed task be examined? Unfortunately I haven't access to UI of Spark just can use command line. Thanks! Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/log-files-of-failed-task-tp22183.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
calculating TF-IDF for large 100GB dataset problems
Hi, I try to vectorize on yarn cluster corpus of texts (about 500K texts in 13 files - 100GB totally) located in HDFS . This process already token about 20 hours on 3 node cluster with 6 cores, 20GB RAM on each node. In my opinion it's to long :-) I started the task with the following command: spark-submit --master yarn --num-executors 9 --executor-memory 5GB --excutor-cores=2 --driver-memory 5GB weight.py weight.py: from pyspark import SparkConf, SparkContext from pyspark.mllib.feature import HashingTF from pyspark.mllib.feature import IDF from pyspark.mllib.feature import Normalizer conf = SparkConf() \ .set(spark.hadoop.validateOutputSpecs, false) \ .set(spark.yarn.executor.memoryOverhead, 900) sc = SparkContext(conf=conf) # reading files from directory 'in/texts.txt' in HDFS texts=sc.textFile('in/texts.txt') \ .map(lambda line: line.split()) hashingTF = HashingTF() tf = hashingTF.transform(texts) tf.cache() idf = IDF(minDocFreq=100).fit(tf) tfidf = idf.transform(tf) n=Normalizer() normalized=n.transform(tfidf) def x2((vec, num)): triples=[] for id, weight in zip(vec.indices, vec.values): triples.append((num, id, weight)) return triples # I use zipWithIndex to enumerate documents normalized.zipWithIndex() \ .flatMap(x2) \ .map(lambda t: '{}\t{}\t{}'.format(t[0],t[1],t[2])) \ .saveAsTextFile('out/weights.txt') 1) What could be a bottleneck? Unfortunately I don't have access to the web UI. In the log file I see stages: 0,1,2,3 Stage 0 MapPartitionsRDD[6] at mapPartitionsWithIndex at RDDFunctions.scala:108 with 584 tasks completed very quick Stage 1 MappedRDD[8] at values at RDDFunctions.scala:110 (23 tasks) - quick too Stage 2 zipWithIndex (584 tasks) was long (17 hours) Stage 3 saveAsTextFile (584 tasks) - too (still executing about 2 hours) I don't understand bounds of Stages 0,1.. And don't understand why I I see numbers like 584 or 23 tasks on stages. 2) On previous start of this task I saw a lot of executor lost errors of yarn scheduler. Later I added .set(spark.yarn.executor.memoryOverhead, 900) setting in code and now I see only a few such messages. Could it be a reason of poor performance? Please advise! Any explainations appreciated! Serg. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/calculating-TF-IDF-for-large-100GB-dataset-problems-tp22144.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDD ordering after map
Does map(...) preserve ordering of original RDD? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-ordering-after-map-tp22129.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MEMORY_ONLY vs MEMORY_AND_DISK
What persistance level is better if RDD to be cached is heavily to be recalculated? Am I right it is MEMORY_AND_DISK? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MEMORY-ONLY-vs-MEMORY-AND-DISK-tp22130.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Processing of text file in large gzip archive
I have a 30GB gzip file (originally that is text file where each line represents text document) in HDFS and Spark 1.2.0 under YARN cluster with 3 worker nodes with 64GB RAM and 4 cores on each node. Replictaion factor for my file is 3. I tried to implement simple pyspark script to parse this file and represent it in tf-idf: Something like: lines=sc.textFile('file.gz') docs=lines.map(lambda: line.split(' ')) hashingTF=HashingTF() tf=hashingTF.transform(docs) tf.cache() idf=IDF().fit(tf) tfidf=idf.transform(tf) tfidf.map(lambda t: ' '.join([u'{}:{}'.format(t[0], t[1]) for t in zip(t.indices, t.values)])) \ .saveAsTextFile('tfidf.txt') I started the scipt with: spark-submit --master yarn --num-executors 24 script.py No comments about why I selected 24 executors - that is just first try. I saw in the output that all 24 executors and corresponding blockmanagers with 0.5 GB on each of them were started on 3 nodes but output stops on messages: INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on node3:36765 (size: 49.7 KB, free: 530.0 MB) INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on node3:36765 (size: 21.6 KB, free: 529.9 MB) I'm waiting already about 1 hour and don't see any changes. (Unfortunately I cannot monitor the cluster via Web UI) My main question is it generally speaking normal time of processing for such volume of data and such cluster? Is it ok that output stops on Added broadcast...? Is it ok to read gzip archive via sc.textFile(..) in code or is it better to unpack it before (from performance purposes)? How to monitor Spark task via command line? Please advise about some tuning. Thanks! Sergey. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Processing-of-text-file-in-large-gzip-archive-tp22073.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SVD transform of large matrix with MLlib
Does somebody used SVD from MLlib for very large (like 10^6 x 10^7) sparse matrix? What time did it take? What implementation of SVD is used in MLLib? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SVD-transform-of-large-matrix-with-MLlib-tp22005.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
can not submit job to spark in windows
Hi! I downloaded and extracted Spark to local folder under windows 7 and have successfully played with it in pyspark interactive shell. BUT When I try to use spark-submit (for example: job-submit pi.py ) I get: C:\spark-1.2.1-bin-hadoop2.4\binspark-submit.cmd pi.py Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/02/26 18:21:37 INFO SecurityManager: Changing view acls to: sergun 15/02/26 18:21:37 INFO SecurityManager: Changing modify acls to: sergun 15/02/26 18:21:37 INFO SecurityManager: SecurityManager: authentication disabled ; ui acls disabled; users with view permissions: Set(sergun); users with mo dify permissions: Set(user) 15/02/26 18:21:38 INFO Slf4jLogger: Slf4jLogger started 15/02/26 18:21:38 INFO Remoting: Starting remoting 15/02/26 18:21:39 INFO Remoting: Remoting started; listening on addresses :[akka .tcp://sparkDriver@mypc:56640] 15/02/26 18:21:39 INFO Utils: Successfully started service 'sparkDriver' on port 56640. 15/02/26 18:21:39 INFO SparkEnv: Registering MapOutputTracker 15/02/26 18:21:39 INFO SparkEnv: Registering BlockManagerMaster 15/02/26 18:21:39 INFO DiskBlockManager: Created local directory at C:\Users\sergun\AppData\Local\Temp\spark-adddeb0b-d6c8-4720-92e3-05255d46ea66\spark-c65cd4 06-28a4-486d-a1ad-92e4814df6fa 15/02/26 18:21:39 INFO MemoryStore: MemoryStore started with capacity 265.0 MB 15/02/26 18:21:40 WARN NativeCodeLoader: Unable to load native-hadoop library fo r your platform... using builtin-java classes where applicable 15/02/26 18:21:40 ERROR Shell: Failed to locate the winutils binary in the hadoo p binary path java.io.IOException: Could not locate executable C:\\bin\winutils.exe in the Had oop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.clinit(Shell.java:326) at org.apache.hadoop.util.StringUtils.clinit(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) at org.apache.hadoop.security.Groups.init(Groups.java:77) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Group s.java:240) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupI nformation.java:255) at org.apache.hadoop.security.UserGroupInformation.setConfiguration(User GroupInformation.java:283) at org.apache.spark.deploy.SparkHadoopUtil.init(SparkHadoopUtil.scala: 44) at org.apache.spark.deploy.SparkHadoopUtil$.init(SparkHadoopUtil.scala :214) at org.apache.spark.deploy.SparkHadoopUtil$.clinit(SparkHadoopUtil.sca la) at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:1873) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:105) at org.apache.spark.storage.BlockManager.init(BlockManager.scala:180) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:308) at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:159) at org.apache.spark.SparkContext.init(SparkContext.scala:240) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.sc ala:61) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Sou rce) at java.lang.reflect.Constructor.newInstance(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand .java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Unknown Source) 15/02/26 18:21:41 INFO HttpFileServer: HTTP File server directory is C:\Users\sergun\AppData\Local\Temp\spark-79f2a924-4fff-432c-abc8-ac9c6c4ee0c7\spark-1f295 e28-f0db-4daf-b877-2a47990b6e88 15/02/26 18:21:41 INFO HttpServer: Starting HTTP Server 15/02/26 18:21:41 INFO Utils: Successfully started service 'HTTP file server' on port 56641. 15/02/26 18:21:41 INFO Utils: Successfully started service 'SparkUI' on port 404 0. 15/02/26 18:21:41 INFO SparkUI: Started SparkUI at http://mypc:4040 15/02/26 18:21:42 INFO Utils: Copying C:\spark-1.2.1-bin-hadoop2.4\bin\pi.py to C:\Users\sergun\AppData\Local\Temp\spark-76a21028-ccce-4308-9e70-09c3cfa76477\ spark-56b32155-2779-4345-9597-2bfa6a87a51d\pi.py Traceback (most recent call last): File C:/spark-1.2.1-bin-hadoop2.4/bin/pi.py, line 29, in module sc = SparkContext(appName=PythonPi) File C:\spark-1.2.1-bin-hadoop2.4\python\pyspark\context.py, line 105, in __ init__ conf, jsc) File