I worked, thank you.

On 30.03.2015 11:58, Sean Owen wrote:
The behavior is the same. I am not sure it's a problem as much as
design decision. It does not require everything to stay in memory, but
the values for one key at a time. Have a look at how the preceding
shuffle works.

Consider repartitionAndSortWithinPartition to *partition* by hour and
then sort by time. Then you encounter your data for an hour in order
in an Iterator with mapPartitions.

On Mon, Mar 30, 2015 at 10:06 AM, Mario Pastorelli
<mario.pastore...@teralytics.ch> wrote:
we are experiencing some problems with the groupBy operations when used to
group together data that will be written in the same file. The operation
that we want to do is the following: given some data with a timestamp, we
want to sort it by timestamp, group it by hour and write one file per hour.
One could do something like

rdd.groupBy(hour).foreach{ case (hour, group) =>
     val writer = writerForHour(hour)
     group.toSeq.sortBy(hour).foreach(writer.write)
     writer.close()
}

but this will load all the data for one hour in memory and do out of memory
easily. Originally we though it was a problem with the toSeq that was making
string the iterable that you obtain as value from the groupBy but apparently
it is not. We removed the toSeq.sortBy(hour) but we still get OOM when the
data in a group is huge.
I saw that there have been a discussion on the ML about groupBy that must
require everything to stay in memory at
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html#a11487
but I found no solution to my problem.

So my questions are the following:

1) is this groupBy problem still in Spark 1.3?
2) why the groupBy requires everything to stay in memory? In my ignorance, I
was convinced that groupBy was working with lazy Iterators instead of a
strict Iterable. I think this is how mapPartition works. The operation after
the groupBy then would decide if the iterator should be strict or not. So
groupBy.foreach would be lazy and every record got by the foreach could be
directly passed to the foreach without waiting for the others. Is this not
possible for some reason?
3) is there an another way to do what I want to do? Keep in mind that I
can't repartition because the number of partitions is dynamic on the number
of year/days/hours. One solution could be to work at minutes level when
there is too much data but we still wants to create one file per hour.

Thanks!

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to