The behavior is the same. I am not sure it's a problem as much as
design decision. It does not require everything to stay in memory, but
the values for one key at a time. Have a look at how the preceding
shuffle works.

Consider repartitionAndSortWithinPartition to *partition* by hour and
then sort by time. Then you encounter your data for an hour in order
in an Iterator with mapPartitions.

On Mon, Mar 30, 2015 at 10:06 AM, Mario Pastorelli
<mario.pastore...@teralytics.ch> wrote:
> we are experiencing some problems with the groupBy operations when used to
> group together data that will be written in the same file. The operation
> that we want to do is the following: given some data with a timestamp, we
> want to sort it by timestamp, group it by hour and write one file per hour.
> One could do something like
>
> rdd.groupBy(hour).foreach{ case (hour, group) =>
>     val writer = writerForHour(hour)
>     group.toSeq.sortBy(hour).foreach(writer.write)
>     writer.close()
> }
>
> but this will load all the data for one hour in memory and do out of memory
> easily. Originally we though it was a problem with the toSeq that was making
> string the iterable that you obtain as value from the groupBy but apparently
> it is not. We removed the toSeq.sortBy(hour) but we still get OOM when the
> data in a group is huge.
> I saw that there have been a discussion on the ML about groupBy that must
> require everything to stay in memory at
> http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html#a11487
> but I found no solution to my problem.
>
> So my questions are the following:
>
> 1) is this groupBy problem still in Spark 1.3?
> 2) why the groupBy requires everything to stay in memory? In my ignorance, I
> was convinced that groupBy was working with lazy Iterators instead of a
> strict Iterable. I think this is how mapPartition works. The operation after
> the groupBy then would decide if the iterator should be strict or not. So
> groupBy.foreach would be lazy and every record got by the foreach could be
> directly passed to the foreach without waiting for the others. Is this not
> possible for some reason?
> 3) is there an another way to do what I want to do? Keep in mind that I
> can't repartition because the number of partitions is dynamic on the number
> of year/days/hours. One solution could be to work at minutes level when
> there is too much data but we still wants to create one file per hour.
>
> Thanks!
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to