The behavior is the same. I am not sure it's a problem as much as design decision. It does not require everything to stay in memory, but the values for one key at a time. Have a look at how the preceding shuffle works.
Consider repartitionAndSortWithinPartition to *partition* by hour and then sort by time. Then you encounter your data for an hour in order in an Iterator with mapPartitions. On Mon, Mar 30, 2015 at 10:06 AM, Mario Pastorelli <mario.pastore...@teralytics.ch> wrote: > we are experiencing some problems with the groupBy operations when used to > group together data that will be written in the same file. The operation > that we want to do is the following: given some data with a timestamp, we > want to sort it by timestamp, group it by hour and write one file per hour. > One could do something like > > rdd.groupBy(hour).foreach{ case (hour, group) => > val writer = writerForHour(hour) > group.toSeq.sortBy(hour).foreach(writer.write) > writer.close() > } > > but this will load all the data for one hour in memory and do out of memory > easily. Originally we though it was a problem with the toSeq that was making > string the iterable that you obtain as value from the groupBy but apparently > it is not. We removed the toSeq.sortBy(hour) but we still get OOM when the > data in a group is huge. > I saw that there have been a discussion on the ML about groupBy that must > require everything to stay in memory at > http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html#a11487 > but I found no solution to my problem. > > So my questions are the following: > > 1) is this groupBy problem still in Spark 1.3? > 2) why the groupBy requires everything to stay in memory? In my ignorance, I > was convinced that groupBy was working with lazy Iterators instead of a > strict Iterable. I think this is how mapPartition works. The operation after > the groupBy then would decide if the iterator should be strict or not. So > groupBy.foreach would be lazy and every record got by the foreach could be > directly passed to the foreach without waiting for the others. Is this not > possible for some reason? > 3) is there an another way to do what I want to do? Keep in mind that I > can't repartition because the number of partitions is dynamic on the number > of year/days/hours. One solution could be to work at minutes level when > there is too much data but we still wants to create one file per hour. > > Thanks! > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org