Re: RDD Blocks skewing to just few executors
Hi All, I'm experiencing the same issue with Spark 120 (not verified with previous). Could you please help us on this? Thanks Alessandro On Tue, Nov 18, 2014 at 1:40 AM, mtimper mich...@timper.com wrote: Hi I'm running a standalone cluster with 8 worker servers. I'm developing a streaming app that is adding new lines of text to several different RDDs each batch interval. Each line has a well randomized unique identifier that I'm trying to use for partitioning, since the data stream does contain duplicates lines. I'm doing partitioning with this: val eventsByKey = streamRDD.map { event = (getUID(event), event)} val partionedEventsRdd = sparkContext.parallelize(eventsByKey.toSeq) .partitionBy(new HashPartitioner(numPartions)).map(e = e._2) I'm adding to the existing RDD like with this: val mergedRDD = currentRDD.zipPartitions(partionedEventsRdd, true) { (currentIter,batchIter) = val uniqEvents = ListBuffer[String]() val uids = Map[String,Boolean]() Array(currentIter, batchIter).foreach { iter = iter.foreach { event = val uid = getUID(event) if (!uids.contains(uid)) { uids(uid) = true uniqEvents +=event } } } uniqEvents.iterator } val count = mergedRDD.count The reason I'm doing it this way is that when I was doing: val mergedRDD = currentRDD.union(batchRDD).coalesce(numPartions).distinct val count = mergedRDD.count It would start taking a long time and a lot of shuffles. The zipPartitions approach does perform better, though after running an hour or so I start seeing this in the webUI. http://apache-spark-user-list.1001560.n3.nabble.com/file/n19112/Executors.png As you can see most of the data is skewing to just 2 executors, with 1 getting more than half the Blocks. These become a hotspot and eventually I start seeing OOM errors. I've tried this a half a dozen times and the 'hot' executors changes, but not the skewing behavior. Any idea what is going on here? Thanks, Mike -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Blocks-skewing-to-just-few-executors-tp19112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD Partition number
Hi All, Thanks for your answers. I have one more details to point out. It is clear now how partition number is defined for HDFS file, However, if i have my dataset replicated on all the machines in the same absolute path. In this case each machine has for instance ext3 filesystem. If i load the file in a RDD how many partitions are defined in this case and why? I found that Spark define a number, say K, of partitions. If i force the partition to be =K my parameter is ignored. If a set a value K*=K then Spark set K* partitions. Thanks for your help Alessandro On Thu, Feb 19, 2015 at 6:27 PM, Ted Yu yuzhih...@gmail.com wrote: bq. *blocks being 64MB by default in HDFS* *In hadoop 2.1+, default block size has been increased.* See https://issues.apache.org/jira/browse/HDFS-4053 Cheers On Thu, Feb 19, 2015 at 8:32 AM, Ted Yu yuzhih...@gmail.com wrote: What file system are you using ? If you use hdfs, the documentation you cited is pretty clear on how partitions are determined. bq. file X replicated on 4 machines I don't think replication factor plays a role w.r.t. partitions. On Thu, Feb 19, 2015 at 8:05 AM, Alessandro Lulli lu...@di.unipi.it wrote: Hi All, Could you please help me understanding how Spark defines the number of partitions of the RDDs if not specified? I found the following in the documentation for file loaded from HDFS: *The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks* What is the rule for file loaded from the file systems? For instance, i have a file X replicated on 4 machines. If i load the file X in a RDD how many partitions are defined and why? Thanks for your help on this Alessandro
RDD Partition number
Hi All, Could you please help me understanding how Spark defines the number of partitions of the RDDs if not specified? I found the following in the documentation for file loaded from HDFS: *The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks* What is the rule for file loaded from the file systems? For instance, i have a file X replicated on 4 machines. If i load the file X in a RDD how many partitions are defined and why? Thanks for your help on this Alessandro
Re: Job aborted due to stage failure: TID x failed for unknown reasons
Hi All, Can someone help on this? I'm encountering exactly the same issue in a very similar scenario with the same spark version. Thanks Alessandro On Fri, Jul 18, 2014 at 8:30 PM, Shannon Quinn squ...@gatech.edu wrote: Hi all, I'm dealing with some strange error messages that I *think* comes down to a memory issue, but I'm having a hard time pinning it down and could use some guidance from the experts. I have a 2-machine Spark (1.0.1) cluster. Both machines have 8 cores; one has 16GB memory, the other 32GB (which is the master). My application involves computing pairwise pixel affinities in images, though the images I've tested so far only get as big as 1920x1200, and as small as 16x16. I did have to change a few memory and parallelism settings, otherwise I was getting explicit OutOfMemoryExceptions. In spark-default.conf: spark.executor.memory14g spark.default.parallelism32 spark.akka.frameSize1000 In spark-env.sh: SPARK_DRIVER_MEMORY=10G With those settings, however, I get a bunch of WARN statements about Lost TIDs (no task is successfully completed) in addition to lost Executors, which are repeated 4 times until I finally get the following error message and crash: --- 14/07/18 12:06:20 INFO TaskSchedulerImpl: Cancelling stage 0 14/07/18 12:06:20 INFO DAGScheduler: Failed to run collect at /home/user/Programming/PySpark-Affinities/affinity.py:243 Traceback (most recent call last): File /home/user/Programming/PySpark-Affinities/affinity.py, line 243, in module lambda x: np.abs(IMAGE.value[x[0]] - IMAGE.value[x[1]]) File /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py, line 583, in collect bytesInJava = self._jrdd.collect().iterator() File /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py, line 537, in __call__ File /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o27.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:13 failed 4 times, most recent failure: *TID 32 on host master.host.univ.edu http://master.host.univ.edu failed for unknown reason* Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/07/18 12:06:20 INFO DAGScheduler: Executor lost: 4 (epoch 4) 14/07/18 12:06:20 INFO BlockManagerMasterActor: Trying to remove executor 4 from BlockManagerMaster. 14/07/18 12:06:20 INFO BlockManagerMaster: Removed 4 successfully in removeExecutor user@master:~/Programming/PySpark-Affinities$ --- If I run the really small image instead (16x16), it *appears* to run to completion (gives me the output I expect without any exceptions being thrown). However, in the stderr logs for the app that was run, it lists the state as KILLED with the final message a ERROR CoarseGrainedExecutorBackend: Driver Disassociated. If I run any larger images, I get the exception I pasted above. Furthermore, if I just do a spark-submit with master=local[*], aside from still needing to set the aforementioned memory options, it will work for an image of *any*