Re: RDD Blocks skewing to just few executors

2015-03-20 Thread Alessandro Lulli
Hi All,

I'm experiencing the same issue with Spark 120 (not verified with previous).

Could you please help us on this?

Thanks
Alessandro

On Tue, Nov 18, 2014 at 1:40 AM, mtimper mich...@timper.com wrote:

 Hi I'm running a standalone cluster with 8 worker servers.
 I'm developing a streaming app that is adding new lines of text to several
 different RDDs each batch interval. Each line has a well randomized unique
 identifier that I'm trying to use for partitioning, since the data stream
 does contain duplicates lines. I'm doing partitioning with this:

 val eventsByKey =  streamRDD.map { event = (getUID(event), event)}
 val partionedEventsRdd = sparkContext.parallelize(eventsByKey.toSeq)
.partitionBy(new HashPartitioner(numPartions)).map(e = e._2)

 I'm adding to the existing RDD like with this:

 val mergedRDD = currentRDD.zipPartitions(partionedEventsRdd, true) {
 (currentIter,batchIter) =
 val uniqEvents = ListBuffer[String]()
 val uids = Map[String,Boolean]()
 Array(currentIter, batchIter).foreach { iter =
   iter.foreach { event =
 val uid = getUID(event)
 if (!uids.contains(uid)) {
 uids(uid) = true
 uniqEvents +=event
 }
   }
 }
 uniqEvents.iterator
 }

 val count = mergedRDD.count

 The reason I'm doing it this way is that when I was doing:

 val mergedRDD = currentRDD.union(batchRDD).coalesce(numPartions).distinct
 val count = mergedRDD.count

 It would start taking a long time and a lot of shuffles.

 The zipPartitions approach does perform better, though after running an
 hour
 or so I start seeing this
 in the webUI.

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n19112/Executors.png
 

 As you can see most of the data is skewing to just 2 executors, with 1
 getting more than half the Blocks. These become a hotspot and eventually I
 start seeing OOM errors. I've tried this a half a dozen times and the 'hot'
 executors changes, but not the skewing behavior.

 Any idea what is going on here?

 Thanks,

 Mike




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Blocks-skewing-to-just-few-executors-tp19112.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: RDD Partition number

2015-02-20 Thread Alessandro Lulli
Hi All,

Thanks for your answers.
I have one more details to point out.

It is clear now how partition number is defined for HDFS file,

However, if i have my dataset replicated on all the machines in the same
absolute path.
In this case each machine has for instance ext3 filesystem.

If i load the file in a RDD how many partitions are defined in this case
and why?

I found that Spark define a number, say K, of partitions. If i force the
partition to be =K my parameter is ignored.
If a set a value K*=K then Spark set K* partitions.

Thanks for your help
Alessandro


On Thu, Feb 19, 2015 at 6:27 PM, Ted Yu yuzhih...@gmail.com wrote:

 bq. *blocks being 64MB by default in HDFS*


 *In hadoop 2.1+, default block size has been increased.*
 See https://issues.apache.org/jira/browse/HDFS-4053

 Cheers

 On Thu, Feb 19, 2015 at 8:32 AM, Ted Yu yuzhih...@gmail.com wrote:

 What file system are you using ?

 If you use hdfs, the documentation you cited is pretty clear on how
 partitions are determined.

 bq. file X replicated on 4 machines

 I don't think replication factor plays a role w.r.t. partitions.

 On Thu, Feb 19, 2015 at 8:05 AM, Alessandro Lulli lu...@di.unipi.it
 wrote:

 Hi All,

 Could you please help me understanding how Spark defines the number of
 partitions of the RDDs if not specified?

 I found the following in the documentation for file loaded from HDFS:
 *The textFile method also takes an optional second argument for
 controlling the number of partitions of the file. By default, Spark creates
 one partition for each block of the file (blocks being 64MB by default in
 HDFS), but you can also ask for a higher number of partitions by passing a
 larger value. Note that you cannot have fewer partitions than blocks*

 What is the rule for file loaded from the file systems?
 For instance, i have a file X replicated on 4 machines. If i load the
 file X in a RDD how many partitions are defined and why?

 Thanks for your help on this
 Alessandro






RDD Partition number

2015-02-19 Thread Alessandro Lulli
Hi All,

Could you please help me understanding how Spark defines the number of
partitions of the RDDs if not specified?

I found the following in the documentation for file loaded from HDFS:
*The textFile method also takes an optional second argument for controlling
the number of partitions of the file. By default, Spark creates one
partition for each block of the file (blocks being 64MB by default in
HDFS), but you can also ask for a higher number of partitions by passing a
larger value. Note that you cannot have fewer partitions than blocks*

What is the rule for file loaded from the file systems?
For instance, i have a file X replicated on 4 machines. If i load the file
X in a RDD how many partitions are defined and why?

Thanks for your help on this
Alessandro


Re: Job aborted due to stage failure: TID x failed for unknown reasons

2014-07-22 Thread Alessandro Lulli
Hi All,

Can someone help on this?

I'm encountering exactly the same issue in a very similar scenario with the
same spark version.

Thanks
Alessandro


On Fri, Jul 18, 2014 at 8:30 PM, Shannon Quinn squ...@gatech.edu wrote:

  Hi all,

 I'm dealing with some strange error messages that I *think* comes down to
 a memory issue, but I'm having a hard time pinning it down and could use
 some guidance from the experts.

 I have a 2-machine Spark (1.0.1) cluster. Both machines have 8 cores; one
 has 16GB memory, the other 32GB (which is the master). My application
 involves computing pairwise pixel affinities in images, though the images
 I've tested so far only get as big as 1920x1200, and as small as 16x16.

 I did have to change a few memory and parallelism settings, otherwise I
 was getting explicit OutOfMemoryExceptions. In spark-default.conf:

 spark.executor.memory14g
 spark.default.parallelism32
 spark.akka.frameSize1000

 In spark-env.sh:

 SPARK_DRIVER_MEMORY=10G

 With those settings, however, I get a bunch of WARN statements about Lost
 TIDs (no task is successfully completed) in addition to lost Executors,
 which are repeated 4 times until I finally get the following error message
 and crash:

 ---

 14/07/18 12:06:20 INFO TaskSchedulerImpl: Cancelling stage 0
 14/07/18 12:06:20 INFO DAGScheduler: Failed to run collect at
 /home/user/Programming/PySpark-Affinities/affinity.py:243
 Traceback (most recent call last):
   File /home/user/Programming/PySpark-Affinities/affinity.py, line 243,
 in module
 lambda x: np.abs(IMAGE.value[x[0]] - IMAGE.value[x[1]])
   File
 /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/pyspark/rdd.py,
 line 583, in collect
 bytesInJava = self._jrdd.collect().iterator()
   File
 /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py,
 line 537, in __call__
   File
 /net/antonin/home/user/Spark/spark-1.0.1-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py,
 line 300, in get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling o27.collect.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0.0:13 failed 4 times, most recent failure: *TID 32 on host
 master.host.univ.edu http://master.host.univ.edu failed for unknown
 reason*
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1044)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 14/07/18 12:06:20 INFO DAGScheduler: Executor lost: 4 (epoch 4)
 14/07/18 12:06:20 INFO BlockManagerMasterActor: Trying to remove executor
 4 from BlockManagerMaster.
 14/07/18 12:06:20 INFO BlockManagerMaster: Removed 4 successfully in
 removeExecutor
 user@master:~/Programming/PySpark-Affinities$

 ---

 If I run the really small image instead (16x16), it *appears* to run to
 completion (gives me the output I expect without any exceptions being
 thrown). However, in the stderr logs for the app that was run, it lists the
 state as KILLED with the final message a ERROR
 CoarseGrainedExecutorBackend: Driver Disassociated. If I run any larger
 images, I get the exception I pasted above.

 Furthermore, if I just do a spark-submit with master=local[*], aside from
 still needing to set the aforementioned memory options, it will work for an
 image of *any*