Re: Shuffle on joining two RDDs

2015-02-13 Thread Imran Rashid
yeah I thought the same thing at first too, I suggested something
equivalent w/ preservesPartitioning = true, but that isn't enough.  the
join is done by union-ing the two transformed rdds, which is very different
from the way it works under the hood in scala to enable narrow
dependencies.  It really needs a bigger change to pyspark.  I filed this
issue: https://issues.apache.org/jira/browse/SPARK-5785

(and the somewhat related issue about documentation:
https://issues.apache.org/jira/browse/SPARK-5786)

partitioning should still work in pyspark, you still need some notion of
distributing work, and the pyspark functions have a partitionFunc to decide
that.  But, I am not an authority on pyspark, so perhaps there are more
holes I'm not aware of ...

Imran

On Fri, Feb 13, 2015 at 8:36 AM, Karlson ksonsp...@siberie.de wrote:

 In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38,
 wouldn't it help to change the lines

 vs = rdd.map(lambda (k, v): (k, (1, v)))
 ws = other.map(lambda (k, v): (k, (2, v)))

 to

 vs = rdd.mapValues(lambda v: (1, v))
 ws = other.mapValues(lambda v: (2, v))

 ?
 As I understand, this would preserve the original partitioning.



 On 2015-02-13 12:43, Karlson wrote:

 Does that mean partitioning does not work in Python? Or does this only
 effect joining?

 On 2015-02-12 19:27, Davies Liu wrote:

 The feature works as expected in Scala/Java, but not implemented in
 Python.

 On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid iras...@cloudera.com
 wrote:

 I wonder if the issue is that these lines just need to add
 preservesPartitioning = true
 ?

 https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

 I am getting the feeling this is an issue w/ pyspark


 On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com
 wrote:


 ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.
 It
 could be that pyspark doesn't properly support narrow dependencies, or
 maybe
 you need to be more explicit about the partitioner.  I am looking into
 the
 pyspark api but you might have some better guesses here than I thought.

 My suggestion to do

 joinedRdd.getPartitions.foreach{println}

 was just to see if the partition was a NarrowCoGroupSplitDep or a
 ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
 fields
 are hidden deeper inside and are not user-visible.  But I think a
 better way
 (in scala, anyway) is to look at rdd.dependencies.  its a little
 tricky,
 though, you need to look deep into the lineage (example at the end).

 Sean -- yes it does require both RDDs have the same partitioner, but
 that
 should happen naturally if you just specify the same number of
 partitions,
 you'll get equal HashPartitioners.  There is a little difference in the
 scala  python api that I missed here.  For partitionBy in scala, you
 actually need to specify the partitioner, but not in python.  However I
 thought it would work like groupByKey, which does just take an int.


 Here's a code example in scala -- not sure what is available from
 python.
 Hopefully somebody knows a simpler way to confirm narrow dependencies??

  val d = sc.parallelize(1 to 1e6.toInt).map{x = x - x}.groupByKey(64)
 val d2 = sc.parallelize(3 to 1e6.toInt).map{x = x -
 x}.groupByKey(64)
 scala d.partitioner == d2.partitioner
 res2: Boolean = true
 val joined = d.join(d2)
 val d3 = sc.parallelize(3 to 1e6.toInt).map{x = x -
 x}.groupByKey(100)
 val badJoined = d.join(d3)

 d.setName(d)
 d2.setName(d2)
 d3.setName(d3)
 joined.setName(joined)
 badJoined.setName(badJoined)


 //unfortunatley, just looking at the immediate dependencies of joined
 
 badJoined is misleading, b/c join actually creates
 // one more step after the shuffle
 scala joined.dependencies
 res20: Seq[org.apache.spark.Dependency[_]] =
 List(org.apache.spark.OneToOneDependency@74751ac8)
 //even with the join that does require a shuffle, we still see a
 OneToOneDependency, but thats just a simple flatMap step
 scala badJoined.dependencies
 res21: Seq[org.apache.spark.Dependency[_]] =
 List(org.apache.spark.OneToOneDependency@1cf356cc)





  //so lets make a helper function to get all the dependencies
 recursively

 def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
   val deps = rdd.dependencies
   deps.map{rdd - _} ++ deps.flatMap{dep = flattenDeps(dep.rdd)}
 }


 //full dependencies of the good join

 scala flattenDeps(joined).foreach{println}
 (joined FlatMappedValuesRDD[9] at join at
 console:16,org.apache.spark.OneToOneDependency@74751ac8)
 (MappedValuesRDD[8] at join at
 console:16,org.apache.spark.OneToOneDependency@623264af)
 (CoGroupedRDD[7] at join at
 console:16,org.apache.spark.OneToOneDependency@5a704f86)
 (CoGroupedRDD[7] at join at
 console:16,org.apache.spark.OneToOneDependency@37514cd)
 (d ShuffledRDD[3] at groupByKey at
 console:12,org.apache.spark.ShuffleDependency@7ba8a080)
 (MappedRDD[2] at map at
 console:12,org.apache.spark.OneToOneDependency

Re: Size exceeds Integer.MAX_VALUE exception when broadcasting large variable

2015-02-13 Thread Imran Rashid
unfortunately this is a known issue:
https://issues.apache.org/jira/browse/SPARK-1476

as Sean suggested, you need to think of some other way of doing the same
thing, even if its just breaking your one big broadcast var into a few
smaller ones

On Fri, Feb 13, 2015 at 12:30 PM, Sean Owen so...@cloudera.com wrote:

 I think you've hit the nail on the head. Since the serialization
 ultimately creates a byte array, and arrays can have at most ~2
 billion elements in the JVM, the broadcast can be at most ~2GB.

 At that scale, you might consider whether you really have to broadcast
 these values, or want to handle them as RDDs and join and so on.

 Or consider whether you can break it up into several broadcasts?


 On Fri, Feb 13, 2015 at 6:24 PM, soila skavu...@gmail.com wrote:
  I am trying to broadcast a large 5GB variable using Spark 1.2.0. I get
 the
  following exception when the size of the broadcast variable exceeds 2GB.
 Any
  ideas on how I can resolve this issue?
 
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
  org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:99)
  at
  org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:147)
  at
  org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)
  at
  org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:787)
  at
  org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:638)
  at
  org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:992)
  at
 
 org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)
  at
 
 org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
  at
 
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
  at
 
 org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
  at
 
 org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
  at
 org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Size-exceeds-Integer-MAX-VALUE-exception-when-broadcasting-large-variable-tp21648.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Counters in Spark

2015-02-13 Thread Imran Rashid
this is more-or-less the best you can do now, but as has been pointed out,
accumulators don't quite fit the bill for counters.  There is an open issue
to do something better, but no progress on that so far

https://issues.apache.org/jira/browse/SPARK-603

On Fri, Feb 13, 2015 at 11:12 AM, Mark Hamstra m...@clearstorydata.com
wrote:

 Except that transformations don't have an exactly-once guarantee, so this
 way of doing counters may produce different answers across various forms of
 failures and speculative execution.

 On Fri, Feb 13, 2015 at 8:56 AM, Sean McNamara 
 sean.mcnam...@webtrends.com wrote:

  .map is just a transformation, so no work will actually be performed
 until something takes action against it.  Try adding a .count(), like so:

  inputRDD.map { x = {
  counter += 1
} }.count()

  In case it is helpful, here are the docs on what exactly the
 transformations and actions are:
 http://spark.apache.org/docs/1.2.0/programming-guide.html#transformations
 http://spark.apache.org/docs/1.2.0/programming-guide.html#actions

  Cheers,

  Sean


  On Feb 13, 2015, at 9:50 AM, nitinkak001 nitinkak...@gmail.com wrote:

 I am trying to implement counters in Spark and I guess Accumulators are
 the
 way to do it.

 My motive is to update a counter in map function and access/reset it in
 the
 driver code. However the /println/ statement at the end still yields value
 0(It should 9). Am I doing something wrong?

 def main(args : Array[String]){

val conf = new SparkConf().setAppName(SortedNeighbourhoodMatching)
val sc = new SparkContext(conf)
var counter = sc.accumulable(0, Counter)
var inputFilePath = args(0)
val inputRDD = sc.textFile(inputFilePath)

inputRDD.map { x = {
  counter += 1
} }
println(counter.value)
 }




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Counters-in-Spark-tp21646.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Master dies after program finishes normally

2015-02-12 Thread Imran Rashid
The important thing here is the master's memory, that's where you're
getting the GC overhead limit.  The master is updating its UI to include
your finished app when your app finishes, which would cause a spike in
memory usage.

I wouldn't expect the master to need a ton of memory just to serve the UI
for a modest number of small apps, but maybe some of your apps have a lot
of jobs, stages, or tasks.  And there is always lots of overhead from the
jvm, so bumping it up might help.

On Thu, Feb 12, 2015 at 1:25 PM, Manas Kar manasdebashis...@gmail.com
wrote:

 I have 5 workers each executor-memory 8GB of memory. My driver memory is 8
 GB as well. They are all 8 core machines.

 To answer Imran's question my configurations are thus.
 executor_total_max_heapsize = 18GB
 This problem happens at the end of my program.

 I don't have to run a lot of jobs to see this behaviour.
 I can see my output correctly in HDFS and all.
 I will give it one more try after increasing master's memory(which is
 default 296MB to 512 MB)

 ..manas

 On Thu, Feb 12, 2015 at 2:14 PM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 How many nodes do you have in your cluster, how many cores, what is the
 size of the memory?

 On Fri, Feb 13, 2015 at 12:42 AM, Manas Kar manasdebashis...@gmail.com
 wrote:

 Hi Arush,
  Mine is a CDH5.3 with Spark 1.2.
 The only change to my spark programs are
 -Dspark.driver.maxResultSize=3g -Dspark.akka.frameSize=1000.

 ..Manas

 On Thu, Feb 12, 2015 at 2:05 PM, Arush Kharbanda 
 ar...@sigmoidanalytics.com wrote:

 What is your cluster configuration? Did you try looking at the Web UI?
 There are many tips here

 http://spark.apache.org/docs/1.2.0/tuning.html

 Did you try these?

 On Fri, Feb 13, 2015 at 12:09 AM, Manas Kar manasdebashis...@gmail.com
  wrote:

 Hi,
  I have a Hidden Markov Model running with 200MB data.
  Once the program finishes (i.e. all stages/jobs are done) the program
 hangs for 20 minutes or so before killing master.

 In the spark master the following log appears.

 2015-02-12 13:00:05,035 ERROR akka.actor.ActorSystemImpl: Uncaught
 fatal error from thread [sparkMaster-akka.actor.default-dispatcher-31]
 shutting down ActorSystem [sparkMaster]
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at scala.collection.immutable.List$.newBuilder(List.scala:396)
 at
 scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:69)
 at
 scala.collection.AbstractTraversable.genericBuilder(Traversable.scala:105)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:58)
 at
 scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:53)
 at
 scala.collection.TraversableLike$class.builder$1(TraversableLike.scala:239)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:243)
 at
 scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:26)
 at
 org.json4s.MonadicJValue$$anonfun$org$json4s$MonadicJValue$$findDirectByName$1.apply(MonadicJValue.scala:22)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
 at
 scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
 at org.json4s.MonadicJValue.org
 $json4s$MonadicJValue$$findDirectByName(MonadicJValue.scala:22)
 at org.json4s.MonadicJValue.$bslash(MonadicJValue.scala:16)
 at
 org.apache.spark.util.JsonProtocol$.taskStartFromJson(JsonProtocol.scala:450)
 at
 org.apache.spark.util.JsonProtocol$.sparkEventFromJson(JsonProtocol.scala:423)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:71)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2$$anonfun$apply$1.apply(ReplayListenerBus.scala:69)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:69)
 at
 org.apache.spark.scheduler.ReplayListenerBus$$anonfun$replay$2.apply(ReplayListenerBus.scala:55)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at
 org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:55)
 at
 

Re: Shuffle on joining two RDDs

2015-02-12 Thread Imran Rashid
I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?

https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

I am getting the feeling this is an issue w/ pyspark


On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com wrote:

 ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.  It
 could be that pyspark doesn't properly support narrow dependencies, or
 maybe you need to be more explicit about the partitioner.  I am looking
 into the pyspark api but you might have some better guesses here than I
 thought.

 My suggestion to do

 joinedRdd.getPartitions.foreach{println}

 was just to see if the partition was a NarrowCoGroupSplitDep or a
 ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
 fields are hidden deeper inside and are not user-visible.  But I think a
 better way (in scala, anyway) is to look at rdd.dependencies.  its a little
 tricky, though, you need to look deep into the lineage (example at the end).

 Sean -- yes it does require both RDDs have the same partitioner, but that
 should happen naturally if you just specify the same number of partitions,
 you'll get equal HashPartitioners.  There is a little difference in the
 scala  python api that I missed here.  For partitionBy in scala, you
 actually need to specify the partitioner, but not in python.  However I
 thought it would work like groupByKey, which does just take an int.


 Here's a code example in scala -- not sure what is available from python.
 Hopefully somebody knows a simpler way to confirm narrow dependencies??

 val d = sc.parallelize(1 to 1e6.toInt).map{x = x - x}.groupByKey(64)
 val d2 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(64)
 scala d.partitioner == d2.partitioner
 res2: Boolean = true
 val joined = d.join(d2)
 val d3 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(100)
 val badJoined = d.join(d3)

 d.setName(d)
 d2.setName(d2)
 d3.setName(d3)
 joined.setName(joined)
 badJoined.setName(badJoined)


 //unfortunatley, just looking at the immediate dependencies of joined 
 badJoined is misleading, b/c join actually creates
 // one more step after the shuffle
 scala joined.dependencies
 res20: Seq[org.apache.spark.Dependency[_]] =
 List(org.apache.spark.OneToOneDependency@74751ac8)
 //even with the join that does require a shuffle, we still see a
 OneToOneDependency, but thats just a simple flatMap step
 scala badJoined.dependencies
 res21: Seq[org.apache.spark.Dependency[_]] =
 List(org.apache.spark.OneToOneDependency@1cf356cc)




  //so lets make a helper function to get all the dependencies recursively

 def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
   val deps = rdd.dependencies
   deps.map{rdd - _} ++ deps.flatMap{dep = flattenDeps(dep.rdd)}
 }


 //full dependencies of the good join

 scala flattenDeps(joined).foreach{println}
 (joined FlatMappedValuesRDD[9] at join at
 console:16,org.apache.spark.OneToOneDependency@74751ac8)
 (MappedValuesRDD[8] at join at
 console:16,org.apache.spark.OneToOneDependency@623264af)

 *(CoGroupedRDD[7] at join at
 console:16,org.apache.spark.OneToOneDependency@5a704f86)*(CoGroupedRDD[7]
 at join at console:16,org.apache.spark.OneToOneDependency@37514cd)
 (d ShuffledRDD[3] at groupByKey at
 console:12,org.apache.spark.ShuffleDependency@7ba8a080)
 (MappedRDD[2] at map at
 console:12,org.apache.spark.OneToOneDependency@7bc172ec)
 (d2 ShuffledRDD[6] at groupByKey at
 console:12,org.apache.spark.ShuffleDependency@5960236d)
 (MappedRDD[5] at map at
 console:12,org.apache.spark.OneToOneDependency@36b5f6f2)



 //full dependencies of the bad join -- notice the ShuffleDependency!

 scala flattenDeps(badJoined).foreach{println}
 (badJoined FlatMappedValuesRDD[15] at join at
 console:16,org.apache.spark.OneToOneDependency@1cf356cc)
 (MappedValuesRDD[14] at join at
 console:16,org.apache.spark.OneToOneDependency@5dea4db)

 *(CoGroupedRDD[13] at join at
 console:16,org.apache.spark.ShuffleDependency@5c1928df)*(CoGroupedRDD[13]
 at join at console:16,org.apache.spark.OneToOneDependency@77ca77b5)
 (d ShuffledRDD[3] at groupByKey at
 console:12,org.apache.spark.ShuffleDependency@7ba8a080)
 (MappedRDD[2] at map at
 console:12,org.apache.spark.OneToOneDependency@7bc172ec)
 (d3 ShuffledRDD[12] at groupByKey at
 console:12,org.apache.spark.ShuffleDependency@d794984)
 (MappedRDD[11] at map at
 console:12,org.apache.spark.OneToOneDependency@15c98005)



 On Thu, Feb 12, 2015 at 10:05 AM, Karlson ksonsp...@siberie.de wrote:

 Hi Imran,

 thanks for your quick reply.

 Actually I am doing this:

 rddA = rddA.partitionBy(n).cache()
 rddB = rddB.partitionBy(n).cache()

 followed by

 rddA.count()
 rddB.count()

 then joinedRDD = rddA.join(rddB)

 I thought that the count() would force the evaluation, so any subsequent
 joins would be shuffleless. I was wrong about the shuffle amounts however.
 The shuffle write is actually 2GB (i.e. the size

Re: Shuffle on joining two RDDs

2015-02-12 Thread Imran Rashid
Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't require
any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs.  After you do your partitionBy, are you sure those RDDs
are actually materialized  cached somewhere?  eg., if you just did this:

val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the join.  So
though the join itself can happen without partitioning, joinedRdd.count()
will trigger the evaluation of rddA  rddB which will require shuffles.
Note that even if you have some intervening action on rddA  rddB that
shuffles them, unless you persist the result, you will need to reshuffle
them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether or
not you are getting narrow dependencies, which would avoid the shuffle.
 (Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson ksonsp...@siberie.de wrote:

 Hi All,

 using Pyspark, I create two RDDs (one with about 2M records (~200MB), the
 other with about 8M records (~2GB)) of the format (key, value).

 I've done a partitionBy(num_partitions) on both RDDs and verified that
 both RDDs have the same number of partitions and that equal keys reside on
 the same partition (via mapPartitionsWithIndex).

 Now I'd expect that for a join on the two RDDs no shuffling is necessary.
 Looking at the Web UI under http://driver:4040 however reveals that that
 assumption is false.

 In fact I am seeing shuffle writes of about 200MB and reads of about 50MB.

 What's the explanation for that behaviour? Where am I wrong with my
 assumption?

 Thanks in advance,

 Karlson

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: saveAsHadoopFile is not a member of ... RDD[(String, MyObject)]

2015-02-12 Thread Imran Rashid
You need to import the implicit conversions to PairRDDFunctions with

import org.apache.spark.SparkContext._

(note that this requirement will go away in 1.3:
https://issues.apache.org/jira/browse/SPARK-4397)

On Thu, Feb 12, 2015 at 9:36 AM, Vladimir Protsenko protsenk...@gmail.com
wrote:

 Hi. I am stuck with how to save file to hdfs from spark.

 I have written MyOutputFormat extends FileOutputFormatString, MyObject,
 then in spark calling this:

   rddres.saveAsHadoopFile[MyOutputFormat](hdfs://localhost/output) or
   rddres.saveAsHadoopFile(hdfs://localhost/output, classOf[String],
 classOf[MyObject],
classOf[MyOutputFormat])

 where rddres is RDD[(String, MyObject)] from up of transformation pipeline.

 Compilation error is: /value saveAsHadoopFile is not a member of
 org.apache.spark.rdd.RDD[(String, vlpr.MyObject)]/.

 Could someone give me insights on what could be done here to make it
 working? Why it is not a member? Because of wrong types?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsHadoopFile-is-not-a-member-of-RDD-String-MyObject-tp21627.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Shuffle on joining two RDDs

2015-02-12 Thread Imran Rashid
 to do?

 Thank you,

 Karlson

 PS: Sorry for sending this twice, I accidentally did not reply to the
 mailing list first.



 On 2015-02-12 16:48, Imran Rashid wrote:

 Hi Karlson,

 I think your assumptions are correct -- that join alone shouldn't require
 any shuffling.  But its possible you are getting tripped up by lazy
 evaluation of RDDs.  After you do your partitionBy, are you sure those
 RDDs
 are actually materialized  cached somewhere?  eg., if you just did this:

 val rddA = someData.partitionBy(N)
 val rddB = someOtherData.partitionBy(N)
 val joinedRdd = rddA.join(rddB)
 joinedRdd.count() //or any other action

 then the partitioning isn't actually getting run until you do the join.
 So
 though the join itself can happen without partitioning, joinedRdd.count()
 will trigger the evaluation of rddA  rddB which will require shuffles.
 Note that even if you have some intervening action on rddA  rddB that
 shuffles them, unless you persist the result, you will need to reshuffle
 them for the join.

 If this doesn't help explain things, for debugging

 joinedRdd.getPartitions.foreach{println}

 this is getting into the weeds, but at least this will tell us whether or
 not you are getting narrow dependencies, which would avoid the shuffle.
  (Does anyone know of a simpler way to check this?)

 hope this helps,
 Imran




 On Thu, Feb 12, 2015 at 9:25 AM, Karlson ksonsp...@siberie.de wrote:

  Hi All,

 using Pyspark, I create two RDDs (one with about 2M records (~200MB), the
 other with about 8M records (~2GB)) of the format (key, value).

 I've done a partitionBy(num_partitions) on both RDDs and verified that
 both RDDs have the same number of partitions and that equal keys reside
 on
 the same partition (via mapPartitionsWithIndex).

 Now I'd expect that for a join on the two RDDs no shuffling is necessary.
 Looking at the Web UI under http://driver:4040 however reveals that that
 assumption is false.

 In fact I am seeing shuffle writes of about 200MB and reads of about
 50MB.

 What's the explanation for that behaviour? Where am I wrong with my
 assumption?

 Thanks in advance,

 Karlson

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: advice on diagnosing Spark stall for 1.5hr out of 3.5hr job?

2015-02-04 Thread Imran Rashid
Hi Michael,

judging from the logs, it seems that those tasks are just working a really
long time.  If you have long running tasks, then you wouldn't expect the
driver to output anything while those tasks are working.

What is unusual is that there is no activity during all that time the tasks
are executing.  Are you sure you are looking at the activity of the
executors (the nodes that are actually running the tasks), and not the
activity of the driver node (the node where your main program lives, but
that doesn't do any of the distributed computation)?  It would be perfectly
normal for the driver node to be idle while all the executors were busy
with long running tasks.

I would look at:
(a) the cpu usage etc. of the executor nodes during those long running tasks
(b) the thread dumps of the executors during those long running tasks
(available via the UI under the Executors tab, or just log into the boxes
and run jstack).  Ideally this will point out a hotspot in your code that
is making these tasks take so long.  (Or perhaps it'll point out what is
going on in spark internals that is so slow)
(c) the summary metrics for the long running stage, when it finally
finishes (also available in the UI, under the Stages tab).  You will get
a breakdown of how much time is spent in various phases of the tasks, how
much data is read, etc., which can help you figure out why tasks are slow


Hopefully this will help you find out what is taking so long.  If you find
out the executors really arent' doing anything during these really long
tasks, it would be great to find that out, and maybe get some more info for
a bug report.

Imran


On Tue, Feb 3, 2015 at 6:18 PM, Michael Albert 
m_albert...@yahoo.com.invalid wrote:

 Greetings!

 First, my sincere thanks to all who have given me advice.
 Following previous discussion, I've rearranged my code to try to keep the
 partitions to more manageable sizes.
 Thanks to all who commented.

 At the moment, the input set I'm trying to work with is about 90GB (avro
 parquet format).

 When I run on a reasonable chunk of the data (say half) things work
 reasonably.

 On the full data, the spark process stalls.
 That is, for about 1.5 hours out of a 3.5 hour run, I see no activity.
 No cpu usage, no error message, no network activity.
 It just seems to sits there.
 The messages bracketing the stall are shown below.

 Any advice on how to diagnose this?
 I don't get any error messages.
 The spark UI says that it is running a stage, but it makes no discernible
 progress.
 Ganglia shows no CPU usage or network activity.
 When I shell into the worker nodes there are no filled disks or other
 obvious problems.

 How can I discern what Spark is waiting for?

 The only weird thing seen, other than the stall, is that the yarn logs on
 the workers have lines with messages like this:
 2015-02-03 22:59:58,890 INFO
 org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl
 (Container Monitor): Memory usage of ProcessTree 13158 for container-id
 container_1422834185427_0083_01_21: 7.1 GB of 8.5 GB physical memory
 used; 7.6 GB of 42.5 GB virtual memory used

 It's rather strange that it mentions 42.5 GB of virtual memory.  The
 machines are EMR machines with 32 GB of physical memory and, as far as I
 can determine, no swap space.

 The messages bracketing the stall are shown below.


 Any advice is welcome.

 Thanks!

 Sincerely,
  Mike Albert

 Before the stall.
 15/02/03 21:45:28 INFO cluster.YarnClientClusterScheduler: Removed TaskSet
 5.0, whose tasks have all completed, from pool
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Stage 5
 (mapPartitionsWithIndex at Transposer.scala:147) finished in 4880.317 s
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: looking for newly runnable
 stages
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: running: Set(Stage 3)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: waiting: Set(Stage 6, Stage
 7, Stage 8)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: failed: Set()
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage
 6: List(Stage 3)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage
 7: List(Stage 6)
 15/02/03 21:45:28 INFO scheduler.DAGScheduler: Missing parents for Stage
 8: List(Stage 7)
 At this point, I see no activity for 1.5 hours except for this (XXX for
 I.P. address)
 15/02/03 22:13:24 INFO util.AkkaUtils: Connecting to ExecutorActor:
 akka.tcp://sparkExecutor@ip-XXX.ec2.internal:36301/user/ExecutorActor

 Then finally it started again:
 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 1.0 in
 stage 3.0 (TID 7301) in 7208259 ms on ip-10-171-0-124.ec2.internal (3/4)
 15/02/03 23:31:34 INFO scheduler.TaskSetManager: Finished task 0.0 in
 stage 3.0 (TID 7300) in 7208503 ms on ip-10-171-0-128.ec2.internal (4/4)
 15/02/03 23:31:34 INFO scheduler.DAGScheduler: Stage 3 (mapPartitions at
 Transposer.scala:211) finished in 7209.534 s






Re: Spark SQL taking long time to print records from a table

2015-02-04 Thread Imran Rashid
Many operations in spark are lazy -- most likely your collect() statement
is actually forcing evaluation of severals steps earlier in the pipeline.
The logs  the UI might give you some info about all the stages that are
being run when you get to collect().

I think collect() is just fine if you are trying to pull just one record to
the driver, that shouldn't be a bottleneck.

On Wed, Feb 4, 2015 at 1:32 AM, jguliani jasminkguli...@gmail.com wrote:

 I have 3 text files in hdfs which I am reading using spark sql and
 registering them as table. After that I am doing almost 5-6 operations -
 including joins , group by etc.. And this whole process is taking hardly
 6-7
 secs. ( Source File size - 3 GB with almost 20 million rows ).
 As a final step of my computation, I am expecting only 1 record in my final
 rdd - named as acctNPIScr in below code snippet.

 My question here is that when I am trying to print this rdd either by
 registering as table and printing records from table or by this method -
 acctNPIScr.map(t = Score:  + t(1)).collect().foreach(println). It is
 taking very long time - almost 1.5 minute to print 1 record.

 Can someone pls help me if I am doing something wrong in printing. What is
 the best way to print final result from schemardd.

 .
 val acctNPIScr = sqlContext.sql(SELECT party_id,
 sum(npi_int)/sum(device_priority_new) as  npi_score FROM AcctNPIScoreTemp
 group by party_id )
 acctNPIScr.registerTempTable(AcctNPIScore)

 val endtime = System.currentTimeMillis()
 logger.info(Total sql Time : + (endtime - st))   // this time is
 hardly 5 secs

 println(start printing)

 val result = sqlContext.sql(SELECT * FROM
 AcctNPIScore).collect().foreach(println)

 //acctNPIScr.map(t = Score:  + t(1)).collect().foreach(println)

 logger.info(Total printing Time : + (System.currentTimeMillis() -
 endtime)) // print one record is taking almost 1.5 minute




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-taking-long-time-to-print-records-from-a-table-tp21493.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: 2GB limit for partitions?

2015-02-04 Thread Imran Rashid
Hi Mridul,


do you think you'll keep working on this, or should this get picked up by
others?  Looks like there was a lot of work put into LargeByteBuffer, seems
promising.

thanks,
Imran

On Tue, Feb 3, 2015 at 7:32 PM, Mridul Muralidharan mri...@gmail.com
wrote:

 That is fairly out of date (we used to run some of our jobs on it ... But
 that is forked off 1.1 actually).

 Regards
 Mridul


 On Tuesday, February 3, 2015, Imran Rashid iras...@cloudera.com wrote:

 Thanks for the explanations, makes sense.  For the record looks like this
 was worked on a while back (and maybe the work is even close to a
 solution?)

 https://issues.apache.org/jira/browse/SPARK-1476

 and perhaps an independent solution was worked on here?

 https://issues.apache.org/jira/browse/SPARK-1391


 On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote:

  cc dev list
 
 
  How are you saving the data? There are two relevant 2GB limits:
 
  1. Caching
 
  2. Shuffle
 
 
  For caching, a partition is turned into a single block.
 
  For shuffle, each map partition is partitioned into R blocks, where R =
  number of reduce tasks. It is unlikely a shuffle block  2G, although it
  can still happen.
 
  I think the 2nd problem is easier to fix than the 1st, because we can
  handle that in the network transport layer. It'd require us to divide
 the
  transfer of a very large block into multiple smaller blocks.
 
 
 
  On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com
 wrote:
 
  Michael,
 
  you are right, there is definitely some limit at 2GB.  Here is a
 trivial
  example to demonstrate it:
 
  import org.apache.spark.storage.StorageLevel
  val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
  Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
  d.count()
 
  It gives the same error you are observing.  I was under the same
  impression as Sean about the limits only being on blocks, not
 partitions --
  but clearly that isn't the case here.
 
  I don't know the whole story yet, but I just wanted to at least let you
  know you aren't crazy :)
  At the very least this suggests that you might need to make smaller
  partitions for now.
 
  Imran
 
 
  On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
  m_albert...@yahoo.com.invalid wrote:
 
  Greetings!
 
  Thanks for the response.
 
  Below is an example of the exception I saw.
  I'd rather not post code at the moment, so I realize it is completely
  unreasonable to ask for a diagnosis.
  However, I will say that adding a partitionBy() was the last change
  before this error was created.
 
 
  Thanks for your time and any thoughts you might have.
 
  Sincerely,
   Mike
 
 
 
  Exception in thread main org.apache.spark.SparkException: Job
 aborted
  due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
  failure: Lost task 4.3 in stage 5.0 (TID 6012,
  ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
  java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
  at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
  at
 org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
  at
 
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
  at
 
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at
 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
 scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at
  scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  at
 
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
 
 
--
   *From:* Sean Owen so...@cloudera.com
  *To:* Michael Albert m_albert...@yahoo.com
  *Cc:* user@spark.apache.org user@spark.apache.org
  *Sent:* Monday, February 2, 2015 10:13 PM
  *Subject:* Re: 2GB limit for partitions?

 
  The limit is on blocks, not partitions. Partitions have many blocks.
 
  It sounds like you are creating very large values in memory, but I'm
  not sure given your description. You will run into problems if a
  single object is more than 2GB, of course. More of the stack trace
  might show what is mapping that much memory.
 
  If you simply want data into 1000 files it's a lot simpler. Just
  repartition into 1000 partitions and save the data. If you need more
  control over what goes into which partition

Re: Sort based shuffle not working properly?

2015-02-04 Thread Imran Rashid
I think you are interested in secondary sort, which is still being worked
on:

https://issues.apache.org/jira/browse/SPARK-3655

On Tue, Feb 3, 2015 at 4:41 PM, Nitin kak nitinkak...@gmail.com wrote:

 I thought thats what sort based shuffled did, sort the keys going to the
 same partition.

 I have tried (c1, c2) as (Int, Int) tuple as well. I don't think that
 ordering of c2 type is the problem here.

 On Tue, Feb 3, 2015 at 5:21 PM, Sean Owen so...@cloudera.com wrote:

 Hm, I don't think the sort partitioner is going to cause the result to
 be ordered by c1,c2 if you only partitioned on c1. I mean, it's not
 even guaranteed that the type of c2 has an ordering, right?

 On Tue, Feb 3, 2015 at 3:38 PM, nitinkak001 nitinkak...@gmail.com
 wrote:
  I am trying to implement secondary sort in spark as we do in map-reduce.
 
  Here is my data(tab separated, without c1, c2, c2).
  c1c2 c3
  1   2   4
  1   3   6
  2   4   7
  2   6   8
  3   5   5
  3   1   8
  3   2   0
 
  To do secondary sort, I create paried RDD as
 
  /((c1 + ,+ c2), row)/
 
  and then use a custom partitioner to partition only on c1. I have set
  /spark.shuffle.manager = SORT/ so the keys per partition are sorted.
 For the
  key 3 I am expecting to get
  (3, 1)
  (3, 2)
  (3, 5)
  but still getting the original order
  3,5
  3,1
  3,2
 
  Here is the custom partitioner code:
 
  /class StraightPartitioner(p: Int) extends org.apache.spark.Partitioner
 {
def numPartitions = p
def getPartition(key: Any) = {
  key.asInstanceOf[String].split(,)(0).toInt
}
 
  }/
 
  and driver code, please tell me what I am doing wrong
 
  /val conf = new SparkConf().setAppName(MapInheritanceExample)
  conf.set(spark.shuffle.manager, SORT);
  val sc = new SparkContext(conf)
  val pF = sc.textFile(inputFile)
 
  val log = LogFactory.getLog(MapFunctionTest)
  val partitionedRDD = pF.map { x =
 
  var arr = x.split(\t);
  (arr(0)+,+arr(1), null)
 
  }.partitionBy(new StraightPartitioner(10))
 
  var outputRDD = partitionedRDD.mapPartitions(p = {
p.map({ case(o, n) = {
 o
  }
})
  })/
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Sort-based-shuffle-not-working-properly-tp21487.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 





Re: 2GB limit for partitions?

2015-02-03 Thread Imran Rashid
Thanks for the explanations, makes sense.  For the record looks like this
was worked on a while back (and maybe the work is even close to a solution?)

https://issues.apache.org/jira/browse/SPARK-1476

and perhaps an independent solution was worked on here?

https://issues.apache.org/jira/browse/SPARK-1391


On Tue, Feb 3, 2015 at 5:20 PM, Reynold Xin r...@databricks.com wrote:

 cc dev list


 How are you saving the data? There are two relevant 2GB limits:

 1. Caching

 2. Shuffle


 For caching, a partition is turned into a single block.

 For shuffle, each map partition is partitioned into R blocks, where R =
 number of reduce tasks. It is unlikely a shuffle block  2G, although it
 can still happen.

 I think the 2nd problem is easier to fix than the 1st, because we can
 handle that in the network transport layer. It'd require us to divide the
 transfer of a very large block into multiple smaller blocks.



 On Tue, Feb 3, 2015 at 3:00 PM, Imran Rashid iras...@cloudera.com wrote:

 Michael,

 you are right, there is definitely some limit at 2GB.  Here is a trivial
 example to demonstrate it:

 import org.apache.spark.storage.StorageLevel
 val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
 Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
 d.count()

 It gives the same error you are observing.  I was under the same
 impression as Sean about the limits only being on blocks, not partitions --
 but clearly that isn't the case here.

 I don't know the whole story yet, but I just wanted to at least let you
 know you aren't crazy :)
 At the very least this suggests that you might need to make smaller
 partitions for now.

 Imran


 On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
 m_albert...@yahoo.com.invalid wrote:

 Greetings!

 Thanks for the response.

 Below is an example of the exception I saw.
 I'd rather not post code at the moment, so I realize it is completely
 unreasonable to ask for a diagnosis.
 However, I will say that adding a partitionBy() was the last change
 before this error was created.


 Thanks for your time and any thoughts you might have.

 Sincerely,
  Mike



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
 failure: Lost task 4.3 in stage 5.0 (TID 6012,
 ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


   --
  *From:* Sean Owen so...@cloudera.com
 *To:* Michael Albert m_albert...@yahoo.com
 *Cc:* user@spark.apache.org user@spark.apache.org
 *Sent:* Monday, February 2, 2015 10:13 PM
 *Subject:* Re: 2GB limit for partitions?

 The limit is on blocks, not partitions. Partitions have many blocks.

 It sounds like you are creating very large values in memory, but I'm
 not sure given your description. You will run into problems if a
 single object is more than 2GB, of course. More of the stack trace
 might show what is mapping that much memory.

 If you simply want data into 1000 files it's a lot simpler. Just
 repartition into 1000 partitions and save the data. If you need more
 control over what goes into which partition, use a Partitioner, yes.



 On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
 m_albert...@yahoo.com.invalid wrote:
  Greetings!
 
  SPARK-1476 says that there is a 2G limit for blocks.
  Is this the same as a 2G limit for partitions (or approximately so?)?
 
 
  What I had been attempting to do is the following.
  1) Start with a moderately large data set (currently about 100GB, but
  growing).
  2) Create about 1,000 files (yes, files) each representing a subset of
 the
  data.
 
  The current attempt I am working on is something like this.
  1) Do a map whose output key indicates which of the 1,000

Re: 2GB limit for partitions?

2015-02-03 Thread Imran Rashid
Michael,

you are right, there is definitely some limit at 2GB.  Here is a trivial
example to demonstrate it:

import org.apache.spark.storage.StorageLevel
val d = sc.parallelize(1 to 1e6.toInt, 1).map{i = new
Array[Byte](5e3.toInt)}.persist(StorageLevel.DISK_ONLY)
d.count()

It gives the same error you are observing.  I was under the same impression
as Sean about the limits only being on blocks, not partitions -- but
clearly that isn't the case here.

I don't know the whole story yet, but I just wanted to at least let you
know you aren't crazy :)
At the very least this suggests that you might need to make smaller
partitions for now.

Imran


On Tue, Feb 3, 2015 at 4:58 AM, Michael Albert 
m_albert...@yahoo.com.invalid wrote:

 Greetings!

 Thanks for the response.

 Below is an example of the exception I saw.
 I'd rather not post code at the moment, so I realize it is completely
 unreasonable to ask for a diagnosis.
 However, I will say that adding a partitionBy() was the last change
 before this error was created.


 Thanks for your time and any thoughts you might have.

 Sincerely,
  Mike



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 4 in stage 5.0 failed 4 times, most recent
 failure: Lost task 4.3 in stage 5.0 (TID 6012,
 ip-10-171-0-31.ec2.internal): java.lang.RuntimeException:
 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
 at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
 at
 org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
 at
 org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:307)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


   --
  *From:* Sean Owen so...@cloudera.com
 *To:* Michael Albert m_albert...@yahoo.com
 *Cc:* user@spark.apache.org user@spark.apache.org
 *Sent:* Monday, February 2, 2015 10:13 PM
 *Subject:* Re: 2GB limit for partitions?

 The limit is on blocks, not partitions. Partitions have many blocks.

 It sounds like you are creating very large values in memory, but I'm
 not sure given your description. You will run into problems if a
 single object is more than 2GB, of course. More of the stack trace
 might show what is mapping that much memory.

 If you simply want data into 1000 files it's a lot simpler. Just
 repartition into 1000 partitions and save the data. If you need more
 control over what goes into which partition, use a Partitioner, yes.



 On Mon, Feb 2, 2015 at 8:40 PM, Michael Albert
 m_albert...@yahoo.com.invalid wrote:
  Greetings!
 
  SPARK-1476 says that there is a 2G limit for blocks.
  Is this the same as a 2G limit for partitions (or approximately so?)?
 
 
  What I had been attempting to do is the following.
  1) Start with a moderately large data set (currently about 100GB, but
  growing).
  2) Create about 1,000 files (yes, files) each representing a subset of
 the
  data.
 
  The current attempt I am working on is something like this.
  1) Do a map whose output key indicates which of the 1,000 files it
 will go
  into and whose value is what I will want to stick into the file.
  2) Partition the data and use the body of mapPartition to open a file and
  save the data.
 
  My apologies, this is actually embedded in a bigger mess, so I won't post
  it.
 
  However, I get errors telling me that there is an
 IllegalArgumentException:
  Size exceeds Inter.MAX_VALUE, with sun.nio.ch.FileChannelImpl.map at the
  top of the stack.  This leads me to think that I have hit the limit or
  partition and/or block size.
 
  Perhaps this is not a good way to do it?
 
  I suppose I could run 1,000 passes over the data, each time collecting
 the
  output for one of my 1,000 final files, but that seems likely to be
  painfully slow to run.
 
  Am I missing something?
 
  Admittedly, this is an odd use case
 
  Thanks!
 
  Sincerely,
   Mike Albert


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, 

Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds

2015-01-28 Thread Imran Rashid
I'm not an expert on streaming, but I think you can't do anything like this
right now.  It seems like a very sensible use case, though, so I've created
a jira for it:

https://issues.apache.org/jira/browse/SPARK-5467

On Wed, Jan 28, 2015 at 8:54 AM, YaoPau jonrgr...@gmail.com wrote:

 The TwitterPopularTags example works great: the Twitter firehose keeps
 messages pretty well in order by timestamp, and so to get the most popular
 hashtags over the last 60 seconds, reduceByKeyAndWindow works well.

 My stream pulls Apache weblogs from Kafka, and so it's not as simple:
 messages can pass through out-of-order, and if I take down my streaming
 process and start it up again, the Kafka index stays in place and now I
 might be consuming 10x of what I was consuming before in order to catch up
 to the current time.  In this case, reduceByKeyAndWindow won't work.

 I'd like my bucket size to be 5 seconds, and I'd like to do the same thing
 TwitterPopularTags is doing, except instead of hashtags I have row types,
 and instead of aggregating by 60 seconds of clock time I'd like to
 aggregate
 over all rows of that row type with a timestamp within 60 seconds of the
 current time.

 My thinking is to maintain state in an RDD and update it an persist it with
 each 2-second pass, but this also seems like it could get messy.  Any
 thoughts or examples that might help me?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKeyAndWindow-but-using-log-timestamps-instead-of-clock-seconds-tp21405.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Aggregations based on sort order

2015-01-23 Thread Imran Rashid
I'm not sure about this, but I suspect the answer is:  spark doesn't
guarantee a stable sort, nor does it plan to in the future, so the
implementation has more flexibility.

But you might be interested in the work being done on secondary sort, which
could give you the guarantees you want:
https://issues.apache.org/jira/browse/SPARK-3655
On Jan 19, 2015 4:52 PM, justin.uang justin.u...@gmail.com wrote:

 Hi,

 I am trying to aggregate a key based on some timestamp, and I believe that
 spilling to disk is changing the order of the data fed into the combiner.

 I have some timeseries data that is of the form: (key, date, other
 data)

 Partition 1
 (A, 2, ...)
 (B, 4, ...)
 (A, 1, ...)
 (A, 3, ...)
 (B, 6, ...)

 which I then partition by key, then sort within the partition:

 Partition 1
 (A, 1, ...)
 (A, 2, ...)
 (A, 3, ...)
 (A, 4, ...)

 Partition 2
 (B, 4, ...)
 (B, 6, ...)

 If I run a combineByKey with the same partitioner, then the items for each
 key will be fed into the ExternalAppendOnlyMap in the correct order.
 However, if I spill, then the time slices are spilled to disk as multiple
 partial combiners. When its time to merge the spilled combiners for each
 key, the combiners are combined in the wrong order.

 For example, if during a groupByKey, [(A, 1, ...), (A, 2...)] and
 [(A,
 3, ...), (A, 4, ...)] are spilled separately, it's possible that the
 combiners can be combined in the wrong order, like [(A, 3, ...), (A, 4,
 ...), (A, 1, ...), (A, 2, ...)], which invalidates the invariant that
 all the values for A are passed in order to the combiners.

 I'm not an expert, but I suspect that this is because we use a heap ordered
 by key when iterating, which doesn't retain the order the spilled
 combiners.
 Perhaps we can order our mergeHeap by (hash_key, spill_index), where
 spill_index is incremented each time we spill? This would mean that we
 would
 pop and merge the combiners of each key in order, resulting in [(A, 1,
 ...), (A, 2, ...), (A, 3, ...), (A, 4, ...)].

 Thanks in advance for the help! If there is a way to do this already in
 Spark 1.2, can someone point it out to me?

 Best,

 Justin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Aggregations-based-on-sort-order-tp21245.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: reading a csv dynamically

2015-01-22 Thread Imran Rashid
Spark can definitely process data with optional fields.  It kinda depends
on what you want to do with the results -- its more of a object design /
knowing scala types question.

Eg., scala has a built in type Option specifically for handling optional
data, which works nicely in pattern matching  functional programming.
Just to save myself some typing, I'm going to show an example with 2 or 3
fields:

myProcessedRdd: RDD[(String, Double, Option[Double])] =
sc.textFile(file.csv).map{txt =
  val split = txt.split(,)
  val opt = if split.length == 3 Some(split.toDouble) else None
  (split(0),split(1).toDouble, opt)
}

then eg., say in a later processing step, you want to make the 3rd field
have a default of 6.9, you'd do something like:

myProcessedRdd.map{ case (name, count,ageOpt) =  //arbitrary variable
names I'm just making up
  val age = ageOpt.getOrElse(6.9)
   ...
}

You might be interested in reading up on Scala's Option type, and how you
can use it.  There are a lot of other options too, eg. the Either type if
you want to track 2 alternatives, Try for keeping track of errors, etc.
You can play around with all of them outside of spark.  Of course you could
do similar things in Java well without these types.  You just need to write
your own container for dealing w/ missing data, which could be really
simple in your use case.

I would advise against first creating a key w/ the number of fields, and
then doing a groupByKey.  Since you are only expecting 2 different lengths,
al the data will only go to two tasks, so this will not scale very well.
And though the data is now grouped by length, its all in one RDD, so you've
still got to figure out what to do with both record lengths.

Imran


On Wed, Jan 21, 2015 at 6:46 PM, Pankaj Narang pankajnaran...@gmail.com
wrote:

 Yes I think you need to create one map first which will keep the number of
 values in every line. Now you can group all the records with same number of
 values. Now you know how many types of arrays you will have.


 val dataRDD = sc.textFile(file.csv)
 val dataLengthRDD =   dataRDD .map(line=(_.split(,).length,line))
 val groupedData = dataLengthRDD.groupByKey()

 now you can process the groupedData as it will have arrays of length x in
 one RDD.

 groupByKey([numTasks])  When called on a dataset of (K, V) pairs, returns a
 dataset of (K, IterableV) pairs.


 I hope this helps

 Regards
 Pankaj
 Infoshore Software
 India




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/reading-a-csv-dynamically-tp21304p21307.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: sparkcontext.objectFile return thousands of partitions

2015-01-22 Thread Imran Rashid
I think you should also just be able to provide an input format that never
splits the input data.  This has come up before on the list, but I couldn't
find it.*

I think this should work, but I can't try it out at the moment.  Can you
please try and let us know if it works?

class TextFormatNoSplits extends TextInputFormat {
  override def isSplitable(fs: FileSystem, file: Path): Boolean = false
}

def textFileNoSplits(sc: SparkContext, path: String): RDD[String] = {
  //note this is just a copy of sc.textFile, with a different
InputFormatClass
  sc.hadoopFile(path, classOf[TextFormatNoSplits], classOf[LongWritable],
classOf[Text]).map(pair = pair._2.toString).setName(path)
}


* yes I realize the irony given the recent discussion about mailing list
vs. stackoverflow ...

On Thu, Jan 22, 2015 at 11:01 AM, Sean Owen so...@cloudera.com wrote:

 Yes, that second argument is what I was referring to, but yes it's a
 *minimum*, oops, right. OK, you will want to coalesce then, indeed.

 On Thu, Jan 22, 2015 at 6:51 PM, Wang, Ningjun (LNG-NPV)
 ningjun.w...@lexisnexis.com wrote:
  Ø  If you know that this number is too high you can request a number of
  partitions when you read it.
 
 
 
  How to do that? Can you give a code snippet? I want to read it into 8
  partitions, so I do
 
 
 
  val rdd2 = sc.objectFile[LabeledPoint]( (“file:///tmp/mydir”, 8)
 
  However rdd2 contains thousands of partitions instead of 8 partitions
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Can I save RDD to local file system and then read it back on spark cluster with multiple nodes?

2015-01-16 Thread Imran Rashid
I'm not positive, but I think this is very unlikely to work.

First, when you call sc.objectFile(...),  I think the *driver* will need to
know something about the file, eg to know how many tasks to create.  But it
won't even be able to see the file, since it only lives on the local
filesystem of the cluster nodes.

If you really wanted to, you could probably write out some small metadata
about the files and write your own version of objectFile that uses it.  But
I think there is a bigger conceptual issue.  You might not in general be
sure that you are running on the same nodes when you save the file, as when
you read it back in.  So the file might not be present on the local
filesystem for the active executors.  You might be able to guarantee it for
the specific cluster setup you have now, but it might limit you down the
road.

What are you trying to achieve?  There might be a better way.  I believe
writing to hdfs will usually write one local copy, so you'd still be doing
a local read when you reload the data.

Imran
On Jan 16, 2015 6:19 AM, Wang, Ningjun (LNG-NPV) 
ningjun.w...@lexisnexis.com wrote:

  I have asked this question before but get no answer. Asking again.



 Can I save RDD to the local file system and then read it back on a spark
 cluster with multiple nodes?



 rdd.saveAsObjectFile(“file:///home/data/rdd1”)



 val rdd2 = sc.objectFile(“file:///home/data/rdd1”)



 This will works if the cluster has only one node. But my cluster has 3
 nodes and each node has a local dir called /home/data. Is rdd saved to the
 local dir across 3 nodes? If so, does sc.objectFile(…) smart enough to read
 the local dir in all 3 nodes to merge them into a single rdd?



 Ningjun





Re: Accumulators

2015-01-15 Thread Imran Rashid
You're understanding is basically correct.  Each task creates it's own
local accumulator, and just those results get merged together.

However, there are some performance limitations to be aware of.  First you
need enough memory on the executors to build up whatever those intermediate
results are.  Second, all the work of *merging* the results from each task
are done by the *driver*.  So if there is a lot of stuff to merge, that can
be slow, as its not distributed at all.

Hope that helps a little

Imran
On Jan 14, 2015 6:21 PM, Corey Nolet cjno...@gmail.com wrote:

 Just noticed an error in my wording.

 Should be  I'm assuming it's not immediately aggregating on the driver
 each time I call the += on the Accumulator.

 On Wed, Jan 14, 2015 at 9:19 PM, Corey Nolet cjno...@gmail.com wrote:

 What are the limitations of using Accumulators to get a union of a bunch
 of small sets?

 Let's say I have an RDD[Map{String,Any} and i want to do:

 rdd.map(accumulator += Set(_.get(entityType).get))


 What implication does this have on performance? I'm assuming it's not
 immediately aggregating each time I call the += on the Accumulator. Is it
 doing a local combine and then occasionally sending the results on the
 current partition back to the driver?







Re: Reading one partition at a time

2015-01-13 Thread Imran Rashid
this looks reasonable to me.  As you've done, the important thing is just
to make isSplittable return false.

this shares a bit in common with the sc.wholeTextFile method.  It sounds
like you really want something much simpler than what that is doing, but
you might be interested in looking at that for more inspiration.

On Sun, Jan 4, 2015 at 4:30 PM, Michael Albert 
m_albert...@yahoo.com.invalid wrote:

 Greetings!

 I would like to know if the code below will read one-partition-at-a-time,
 and whether I am reinventing the wheel.

 If I may explain, upstream code has managed (I hope) to save an RDD such
 that each partition file (e.g, part-r-0, part-r-1) contains
 exactly the data subset which I would like to
 repackage in a file of a non-hadoop format.  So what I want to do is
 something like mapPartitionsWithIndex on this data (which is stored in
 sequence files, SNAPPY compressed).  However, if I simply open the data
 set with sequenceFile(), the data is re-partitioned and I loose the
 partitioning
 I want. My intention is that in the closure passed to
 mapPartitionWithIndex,

 I'll open an HDFS file and write the data from the partition in my desired
 format,
 one file for each input partition.
 The code below seems to work, I think.  Have I missed something bad?
 Thanks!

 -Mike

 class NonSplittingSequenceFileInputFormat[K,V]
 //extends
 org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat[K,V] // XXX
 extends org.apache.hadoop.mapred.SequenceFileInputFormat[K,V]
 {
 override def isSplitable(
 //context: org.apache.hadoop.mapreduce.JobContext,
 //path: org.apache.hadoop.fs.Path) = false
 fs: org.apache.hadoop.fs.FileSystem,
 filename: org.apache.hadoop.fs.Path) = false

 }


 sc.hadoopFile(outPathPhase1,
 classOf[NonSplittingSequenceFileInputFormat[K, V]],
 classOf[K],
classOf[V],
1)
 }

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Multiple Filter Effiency

2014-12-16 Thread Imran Rashid
I think accumulators do exactly what you want.

(Scala syntax below, I'm just not familiar with the Java equivalent ...)

val f1counts = sc.accumulator (0)
val f2counts = sc.accumulator (0)
val f3counts = sc.accumulator (0)

textfile.foreach { s =
  if(f1matches) f1counts += 1
  ...
}

Note that you could also do a normal map reduce even though a record might
match more than one filter.  In the scala api you can use flatmap to output
zero or more records:

textfile.flatmap { s =
  Seq (
 (if (f1matches) Some (f1 - 1) else None),
 ...
).flatten
}.reduceByKey { _ + _ }
On Dec 16, 2014 2:07 AM, zkidkid zkid...@gmail.com wrote:

 Hi,
 Currently I am trying to count on a document with multiple filter.
 Let say, here is my document:

 //user field1 field2 field3
 user1 0 0 1
 user2 0 1 0
 user3 0 0 0

 I want to count on user.log for some filters like this:

 Filter1: field1 == 0  field 2 = 0
 Filter2: field1 == 0  field 3 = 1
 Filter3: field1 == 0  field 3 = 0
 ...
 and total line.

 I have tried and I found that I couldn't use group by or map then
 reduce
 because a line could match two or more filter.

 My idea now is foreach line and then maintain a outsite counter service.

 Forexample:

 JavaRDDString textFile = sc.textFile(hdfs, 10);
 long start = System.currentTimeMillis();

 textFile.foreach(new VoidFunctionString() {

 public void call(String s) {
foreach(MyFilter filter: MyFilters){
if(filter.match(s)) filter.increaseOwnCounter();
}
 }
 });


 I would happy if there have another way to do it, any help is appreciate.
 Thanks in advance.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Filter-Effiency-tp20701.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: NumberFormatException

2014-12-16 Thread Imran Rashid
wow, really weird.  My intuition is the same as everyone else's, some
unprintable character.  Here's a couple more debugging tricks I've used in
the past:

//set up an accumulator to catch the bad rows as a side-effect
val nBadRows = sc.accumulator(0)
val nGoodRows = sc.accumulator(0)
val badRows =
sc.accumulableCollection(scala.collection.mutable.Set[String]())

//flatMap so that you can skip the bad rows

datastream.flatMap{ str =
  try {
val strArray = str.trim().split(,)
val result = (strArray(0).toInt, strArray(1).toInt)
nGoodRows += 1
Some(result)
  }  catch {
case NumberFormatException =
  nBadRows += 1
  badRows += str
  None
  }
}.saveAsTextFile(...)


if (badRows.value.nonEmpty) {
  println( BAD ROWS *)
  badRows.value.foreach{str =
//look at a bit more info from each string ... print out length  each
character one by one
println(str)
println(str.length)
str.foreach{println}
println()
  }
}

// if it is some data corruption, that you just have to live with, you
might leave the flatMap / try
// even when you'e running it for real.  But then you might want to add a
little check that there aren't
// t many bad rows.  Note that the accumulator[Set] will run out of
mem if there are really
// a ton of bad rows, in which case you might switch to a reservoir sample

val badFrac = nBadRows.value / (nGoodRows.value + nBadRows.value.toDouble)
println(s${nBadRows.value} bad rows; ${nGoodRows.value} good rows;
($badFrac) bad fraction)
if (badFrac  maxAllowedBadRows) {
  throw new RuntimeException(too many bad rows!  + badFrac)
}




On Mon, Dec 15, 2014 at 3:49 PM, yu yuz1...@iastate.edu wrote:

 Hello, everyone

 I know 'NumberFormatException' is due to the reason that String can not be
 parsed properly, but I really can not find any mistakes for my code. I hope
 someone may kindly help me.
 My hdfs file is as follows:
 8,22
 3,11
 40,10
 49,47
 48,29
 24,28
 50,30
 33,56
 4,20
 30,38
 ...

 So each line contains an integer + , + an integer + \n
 My code is as follows:
 object StreamMonitor {
   def main(args: Array[String]): Unit = {
 val myFunc = (str: String) = {
   val strArray = str.trim().split(,)
   (strArray(0).toInt, strArray(1).toInt)
 }
 val conf = new SparkConf().setAppName(StreamMonitor);
 val ssc = new StreamingContext(conf, Seconds(30));
 val datastream = ssc.textFileStream(/user/yu/streaminput);
 val newstream = datastream.map(myFunc)
 newstream.saveAsTextFiles(output/, );
 ssc.start()
 ssc.awaitTermination()
   }

 }

 The exception info is:
 14/12/15 15:35:03 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0
 (TID 0, h3): java.lang.NumberFormatException: For input string: 8


 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
 java.lang.Integer.parseInt(Integer.java:492)
 java.lang.Integer.parseInt(Integer.java:527)

 scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
 scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
 StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:9)
 StreamMonitor$$anonfun$1.apply(StreamMonitor.scala:7)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)


 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)


 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)


 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)


 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)

 So based on the above info, 8 is the first number in the file and I think
 it should be parsed to integer without any problems.
 I know it may be a very stupid question and the answer may be very easy.
 But
 I really can not find the reason. I am thankful to anyone who helps!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/NumberFormatException-tp20694.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: what is the best way to implement mini batches?

2014-12-15 Thread Imran Rashid
I'm a little confused by some of the responses.  It seems like there are
two different issues being discussed here:

1.  How to turn a sequential algorithm into something that works on spark.
Eg deal with the fact that data is split into partitions which are
processed in parallel (though within a partition, data is processed
sequentially).  I'm guessing folks are particularly interested in online
machine learning algos, which often have a point update and a mini batch
update.

2.  How to convert a one-point-at-a-time view of the data and convert it
into a mini batches view of the data.

(2) is pretty straightforward, eg with iterator.grouped (batchSize), or
manually put data into your own buffer etc.  This works for creating mini
batches *within* one partition in the context of spark.

But problem (1) is completely separate, and there is no general solution.
It really depends the specifics of what you're trying to do.

Some of the suggestions on this thread seem like they are basically just
falling back to sequential data processing ... but reay inefficient
sequential processing.  Eg.  It doesn't make sense to do a full scan of
your data with spark, and ignore all the records but the few that are in
the next mini batch.

It's completely reasonable to just sequentially process all the data if
that works for you.  But then it doesn't make sense to use spark, you're
not gaining anything from it.

Hope this helps, apologies if I just misunderstood the other suggested
solutions.
On Dec 14, 2014 8:35 PM, Earthson earthson...@gmail.com wrote:

 I think it could be done like:

 1. using mapPartition to randomly drop some partition
 2. drop some elements randomly(for selected partition)
 3. calculate gradient step for selected elements

 I don't think fixed step is needed, but fixed step could be done:

 1. zipWithIndex
 2. create ShuffleRDD based on the index(eg. using index/10 as key)
 3. using mapPartition to calculate each bach

 I also have a question:

 Can mini batches run in parallel?
 I think parallel all batches just like a full batch GD in some case.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20677.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: what is the best way to implement mini batches?

2014-12-11 Thread Imran Rashid
Minor correction:  I think you want iterator.grouped(10) for
non-overlapping mini batches
On Dec 11, 2014 1:37 PM, Matei Zaharia matei.zaha...@gmail.com wrote:

 You can just do mapPartitions on the whole RDD, and then called sliding()
 on the iterator in each one to get a sliding window. One problem is that
 you will not be able to slide forward into the next partition at
 partition boundaries. If this matters to you, you need to do something more
 complicated to get those, such as the repartition that you said (where you
 map each record to the partition it should be in).

 Matei

  On Dec 11, 2014, at 10:16 AM, ll duy.huynh@gmail.com wrote:
 
  any advice/comment on this would be much appreciated.
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20635.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SPARK LIMITATION - more than one case class is not allowed !!

2014-12-05 Thread Imran Rashid
 It's an easy mistake to make... I wonder if an assertion could be
implemented that makes sure the type parameter is present.

We could use the NotNothing pattern

http://blog.evilmonkeylabs.com/2012/05/31/Forcing_Compiler_Nothing_checks/

but I wonder if it would just make the method signature very confusing for
the avg user ...


Re: optimize multiple filter operations

2014-11-29 Thread Imran Rashid
Rishi's approach will work, but its worth mentioning that because all of
the data goes into only two groups, you will only process the resulting
data with two tasks and so you're losing almost all parallelism.
Presumably you're processing a lot of data, since you only want to do one
pass, so I doubt that would actually be helpful.

Unfortunately I don't think there is a better approach than doing two
passes currently.  Given some more info about the downstream processes,
there may be alternatives, but in general I think you are stuck.

Eg., here's a slight variation on Rishi's proposal, that may or may not
work:

initial.groupBy{x = (if (x == something) key1 else key2),
util.Random.nextInt(500))}

which splits the data by a compound key -- first just a label of whether or
not it matches, and then subdivides into another 500 groups.  This will
result in nicely balanced tasks within each group, but also results in a
shuffle of all the data, which can be pretty expensive.  You might be
better off just doing two passes over the raw data.

Imran

On Fri, Nov 28, 2014 at 7:08 PM, Rishi Yadav ri...@infoobjects.com wrote:

 you can try (scala version = you convert to python)

 val set = initial.groupBy( x = if (x == something) key1 else key2)

 This would do one pass over original data.

 On Fri, Nov 28, 2014 at 8:21 AM, mrm ma...@skimlinks.com wrote:

 Hi,

 My question is:

 I have multiple filter operations where I split my initial rdd into two
 different groups. The two groups cover the whole initial set. In code,
 it's
 something like:

 set1 = initial.filter(lambda x: x == something)
 set2 = initial.filter(lambda x: x != something)

 By doing this, I am doing two passes over the data. Is there any way to
 optimise this to do it in a single pass?

 Note: I was trying to look in the mailing list to see if this question has
 been asked already, but could not find it.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/optimize-multiple-filter-operations-tp20010.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Percentile

2014-11-29 Thread Imran Rashid
Hi Franco,

As a fast approximate way to get probability distributions, you might be
interested in t-digests:

https://github.com/tdunning/t-digest

In one pass, you could make a t-digest for each variable, to get its
distribution.  And after that, you could make another pass to map each data
point to its percentile in the distribution.

to create the tdigests, you would do something like this:

val myDataRDD = ...

myDataRDD.mapPartitions{itr =
  xDistribution = TDigest.createArrayDigest(32, 100)
  yDistribution = TDigest.createArrayDigest(32, 100)
  ...
  itr.foreach{ data =
xDistribution.add(data.x)
yDistribution.add(data.y)
...
  }

  Seq(
x - xDistribution,
y - yDistribution
  ).toIterator.map{case(k,v) =
val arr = new Array[Byte](t.byteSize)
v.asBytes(ByteBuffer.wrap(arr))
k - arr
  }
}.reduceByKey{case(t1Arr,t2Arr) =
  val merged =
ArrayDigest.fromBytes(ByteBuffer.wrap(t1Arr)).add(ArrayDigest.fromBytes(ByteBuffer.wrap(t2Arr))
  val arr = new Array[Byte](merged.byteSize)
  merged.asBytes(ByteBuffer.wrap(arr))
}


(the complication there is just that tdigests are not directly
serializable, so I need to do the manual work of converting to and from an
array of bytes).


On Thu, Nov 27, 2014 at 9:28 AM, Franco Barrientos 
franco.barrien...@exalitica.com wrote:

 Hi folks!,



 Anyone known how can I calculate for each elements of a variable in a RDD
 its percentile? I tried to calculate trough Spark SQL with subqueries but I
 think that is imposible in Spark SQL. Any idea will be welcome.



 Thanks in advance,



 *Franco Barrientos*
 Data Scientist

 Málaga #115, Of. 1003, Las Condes.
 Santiago, Chile.
 (+562)-29699649
 (+569)-76347893

 franco.barrien...@exalitica.com

 www.exalitica.com

 [image: http://exalitica.com/web/img/frim.png]





Re: continuing processing when errors occur

2014-07-24 Thread Imran Rashid
Hi Art,

I have some advice that isn't spark-specific at all, so it doesn't
*exactly* address your questions, but you might still find helpful.  I
think using an implicit to add your retyring behavior might be useful.  I
can think of two options:

1. enriching RDD itself, eg. to add a .retryForeach, which would have the
desired behavior.

2. enriching Function to create a variant with retry behavior.

I prefer option 2, because it could be useful outside of spark, and even
within spark, you might realize you want to do something similar for more
than just foreach.

Here's an example.  (probably there is a more functional way to do this, to
avoid the while loop, but my brain isn't working and that's not the point
of this anyway)

Lets say we have this function:

def tenDiv(x:Int) = println(10 / x)

and we try applying it to a normal old Range:

scala (-10 to 10).foreach{tenDiv}
-1
-1
-1
-1
-1
-2
-2
-3
-5
-10
java.lang.ArithmeticException: / by zero
at .tenDiv(console:7)


We can create enrich Function to add some retry behavior:

class RetryFunction[-A](nTries: Int,f: A = Unit) extends Function[A,Unit]
{
  def apply(a: A): Unit = {
var tries = 0
var success = false
while(!success  tries  nTries) {
  tries += 1
  try {
f(a)
  } catch {
case scala.util.control.NonFatal(ex) =
  println(sfailed on try $tries with $ex)
  }
}
  }
}

implicit class Retryable[A](f: A = Unit) {
  def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
}



We activate this behavior by calling .retryable(nTries) on our method.
Like so:

scala (-2 to 2).foreach{(tenDiv _).retryable(1)}
-5
-10
failed on try 1 with java.lang.ArithmeticException: / by zero
10
5

scala (-2 to 2).foreach{(tenDiv _).retryable(3)}
-5
-5
-5
-10
-10
-10
failed on try 1 with java.lang.ArithmeticException: / by zero
failed on try 2 with java.lang.ArithmeticException: / by zero
failed on try 3 with java.lang.ArithmeticException: / by zero
10
10
10
5
5
5


You could do the same thing on closures you pass to RDD.foreach.

I should add, that I'm often very hesitant to use implicits because in can
make it harder to follow what's going on in the code.  I think this version
is OK, though, b/c somebody coming along later and looking at the code at
least can see the call to retryable as a clue.  (I really dislike
implicit conversions that happen without any hints in the actual code.)
Hopefully that's enough of a hint for others to figure out what is going
on.  Eg., intellij will know where that method came from and jump to it,
and also if you make the name unique enough, you can probably find it with
plain text search / c-tags.  But, its definitely worth considering for
yourself.

hope this helps,
Imran



On Thu, Jul 24, 2014 at 1:12 PM, Art Peel found...@gmail.com wrote:

 Our system works with RDDs generated from Hadoop files. It processes each
 record in a Hadoop file and for a subset of those records generates output
 that is written to an external system via RDD.foreach. There are no
 dependencies between the records that are processed.

 If writing to the external system fails (due to a detail of what is being
 written) and throws an exception, I see the following behavior:

 1. Spark retries the entire partition (thus wasting time and effort),
 reaches the problem record and fails again.
 2. It repeats step 1 up to the default 4 tries and then gives up. As a
 result, the rest of records from that Hadoop file are not processed.
 3. The executor where the 4th failure occurred is marked as failed and
 told to shut down and thus I lose a core for processing the remaining
 Hadoop files, thus slowing down the entire process.


 For this particular problem, I know how to prevent the underlying
 exception, but I'd still like to get a handle on error handling for future
 situations that I haven't yet encountered.

 My goal is this:
 Retry the problem record only (rather than starting over at the beginning
 of the partition) up to N times, then give up and move on to process the
 rest of the partition.

 As far as I can tell, I need to supply my own retry behavior and if I want
 to process records after the problem record I have to swallow exceptions
 inside the foreach block.

 My 2 questions are:
 1. Is there anything I can do to prevent the executor from being shut down
 when a failure occurs?


 2. Are there ways Spark can help me get closer to my goal of retrying only
 the problem record without writing my own re-try code and swallowing
 exceptions?

 Regards,
 Art




Re: continuing processing when errors occur

2014-07-24 Thread Imran Rashid
whoops!  just realized I was retyring the function even on success.  didn't
pay enough attention to the output from my calls.  Slightly updated
definitions:

class RetryFunction[-A](nTries: Int,f: A = Unit) extends Function[A,Unit]
{
  def apply(a: A): Unit = {
var tries = 0
var success = false
while(!success  tries  nTries) {
  tries += 1
  try {
f(a)
success = true
  } catch {
case scala.util.control.NonFatal(ex) =
  println(sfailed on input $a, try $tries with $ex)
  }
}
  }
}

implicit class Retryable[A](f: A = Unit) {
  def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
}


def tenDiv(x:Int) = println(x +  ---  + (10 / x))


and example usage:

scala (-2 to 2).foreach{(tenDiv _).retryable(3)}
-2 --- -5
-1 --- -10
failed on input 0, try 1 with java.lang.ArithmeticException: / by zero
failed on input 0, try 2 with java.lang.ArithmeticException: / by zero
failed on input 0, try 3 with java.lang.ArithmeticException: / by zero
1 --- 10
2 --- 5





On Thu, Jul 24, 2014 at 2:58 PM, Imran Rashid im...@therashids.com wrote:

 Hi Art,

 I have some advice that isn't spark-specific at all, so it doesn't
 *exactly* address your questions, but you might still find helpful.  I
 think using an implicit to add your retyring behavior might be useful.  I
 can think of two options:

 1. enriching RDD itself, eg. to add a .retryForeach, which would have the
 desired behavior.

 2. enriching Function to create a variant with retry behavior.

 I prefer option 2, because it could be useful outside of spark, and even
 within spark, you might realize you want to do something similar for more
 than just foreach.

 Here's an example.  (probably there is a more functional way to do this,
 to avoid the while loop, but my brain isn't working and that's not the
 point of this anyway)

 Lets say we have this function:

 def tenDiv(x:Int) = println(10 / x)

 and we try applying it to a normal old Range:

 scala (-10 to 10).foreach{tenDiv}
 -1
 -1
 -1
 -1
 -1
 -2
 -2
 -3
 -5
 -10
 java.lang.ArithmeticException: / by zero
 at .tenDiv(console:7)


 We can create enrich Function to add some retry behavior:

 class RetryFunction[-A](nTries: Int,f: A = Unit) extends Function[A,Unit]
 {
   def apply(a: A): Unit = {
 var tries = 0
 var success = false
 while(!success  tries  nTries) {
   tries += 1
   try {
 f(a)
   } catch {
 case scala.util.control.NonFatal(ex) =
   println(sfailed on try $tries with $ex)
   }
 }
   }
 }

 implicit class Retryable[A](f: A = Unit) {
   def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
 }



 We activate this behavior by calling .retryable(nTries) on our method.
 Like so:

 scala (-2 to 2).foreach{(tenDiv _).retryable(1)}
 -5
 -10
 failed on try 1 with java.lang.ArithmeticException: / by zero
 10
 5

 scala (-2 to 2).foreach{(tenDiv _).retryable(3)}
 -5
 -5
 -5
 -10
 -10
 -10
 failed on try 1 with java.lang.ArithmeticException: / by zero
 failed on try 2 with java.lang.ArithmeticException: / by zero
 failed on try 3 with java.lang.ArithmeticException: / by zero
 10
 10
 10
 5
 5
 5


 You could do the same thing on closures you pass to RDD.foreach.

 I should add, that I'm often very hesitant to use implicits because in can
 make it harder to follow what's going on in the code.  I think this version
 is OK, though, b/c somebody coming along later and looking at the code at
 least can see the call to retryable as a clue.  (I really dislike
 implicit conversions that happen without any hints in the actual code.)
 Hopefully that's enough of a hint for others to figure out what is going
 on.  Eg., intellij will know where that method came from and jump to it,
 and also if you make the name unique enough, you can probably find it with
 plain text search / c-tags.  But, its definitely worth considering for
 yourself.

 hope this helps,
 Imran



 On Thu, Jul 24, 2014 at 1:12 PM, Art Peel found...@gmail.com wrote:

 Our system works with RDDs generated from Hadoop files. It processes each
 record in a Hadoop file and for a subset of those records generates output
 that is written to an external system via RDD.foreach. There are no
 dependencies between the records that are processed.

 If writing to the external system fails (due to a detail of what is being
 written) and throws an exception, I see the following behavior:

 1. Spark retries the entire partition (thus wasting time and effort),
 reaches the problem record and fails again.
 2. It repeats step 1 up to the default 4 tries and then gives up. As a
 result, the rest of records from that Hadoop file are not processed.
 3. The executor where the 4th failure occurred is marked as failed and
 told to shut down and thus I lose a core for processing the remaining
 Hadoop files, thus slowing down the entire process.


 For this particular problem, I know how to prevent the underlying

<    1   2