yes, I see now. In a case of 3 days it's indeed possible, however if I want
to hold 30 days(or even bigger) block aggregation it will be a bit slow.

for the sake of the history:
I've found several directions that I can improve shuffling(from video
https://www.youtube.com/watch?v=Wg2boMqLjCg) e.g. since I don't have cached
rdds, I can try to increase spark.shuffle.memoryFraction from default 0.2
to something bigger(even 0.6)
as for my initial question - there is PR that tries to solve this issue(not
yet merged though) https://github.com/apache/spark/pull/4449 which
introduces custom RDD with custom InputFormat(based on HadoopRDD), I'll try
to do something similar.

anyway thanks for ideas and help!


On 29 May 2015 at 18:01, ayan guha <guha.a...@gmail.com> wrote:

> My point is if you keep daily aggregates already computed then you do not
> reprocess raw data. But yuh you may decide to recompute last 3 days
> everyday.
> On 29 May 2015 23:52, "Igor Berman" <igor.ber...@gmail.com> wrote:
>
>> Hi Ayan,
>> thanks for the response
>> I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only
>> core, might be I should?)
>> What do you mean by materialized? I can repartitionAndSort by key
>> daily-aggregation, however I'm not quite understand how it will help with
>> yesterdays block which needs to be loaded from file and it has no
>> connection to this repartition of daily block.
>>
>>
>> On 29 May 2015 at 01:51, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Which version of spark? In 1.4 window queries will show up for these
>>> kind of scenarios.
>>>
>>> 1 thing I can suggest is keep daily aggregates materialised and
>>> partioned by key and sorted by key-day combination using repartitionandsort
>>> method. It allows you to use custom partitioner and custom sorter.
>>>
>>> Best
>>> Ayan
>>> On 29 May 2015 03:31, "igor.berman" <igor.ber...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> I have a batch daily job that computes daily aggregate of several
>>>> counters
>>>> represented by some object.
>>>> After daily aggregation is done, I want to compute block of 3 days
>>>> aggregation(3,7,30 etc)
>>>> To do so I need to add new daily aggregation to the current block and
>>>> then
>>>> subtract from current block the daily aggregation of the last day
>>>> within the
>>>> current block(sliding window...)
>>>> I've implemented it with something like:
>>>>
>>>> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
>>>> All rdds are keyed by unique id(long). Each rdd is saved in avro files
>>>> after
>>>> the job finishes and loaded when job starts(on next day). baseBlockRdd
>>>> is
>>>> much larger than lastDay and newDay rdds(depends on the size of the
>>>> block)
>>>>
>>>> Unfortunately the performance is not satisfactory due to many shuffles(I
>>>> have parallelism etc) I was looking for the way to improve performance
>>>> somehow, to make sure that one task "joins" same local keys without
>>>> reshuffling baseBlockRdd(which is big) each time the job starts(see
>>>> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
>>>> so bottom line - how to join big rdd with smaller rdd without
>>>> reshuffling
>>>> big rdd over and over again?
>>>> As soon as I've saved this big rdd and reloaded it from disk I want that
>>>> every other rdd will be partitioned and collocated by the same
>>>> "partitioner"(which is absent for hadooprdd) ... somehow, so that only
>>>> small
>>>> rdds will be sent over network.
>>>>
>>>> Another idea I had  - somehow split baseBlock into 2 parts with filter
>>>> by
>>>> keys of small rdds and then join, however I'm not sure it's possible to
>>>> implement this filter without join.
>>>>
>>>> any ideas would be appreciated,
>>>> thanks in advance
>>>> Igor
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>

Reply via email to