Re: Batch aggregation by sliding window + join

2015-05-30 Thread Igor Berman
yes, I see now. In a case of 3 days it's indeed possible, however if I want
to hold 30 days(or even bigger) block aggregation it will be a bit slow.

for the sake of the history:
I've found several directions that I can improve shuffling(from video
https://www.youtube.com/watch?v=Wg2boMqLjCg) e.g. since I don't have cached
rdds, I can try to increase spark.shuffle.memoryFraction from default 0.2
to something bigger(even 0.6)
as for my initial question - there is PR that tries to solve this issue(not
yet merged though) https://github.com/apache/spark/pull/4449 which
introduces custom RDD with custom InputFormat(based on HadoopRDD), I'll try
to do something similar.

anyway thanks for ideas and help!


On 29 May 2015 at 18:01, ayan guha  wrote:

> My point is if you keep daily aggregates already computed then you do not
> reprocess raw data. But yuh you may decide to recompute last 3 days
> everyday.
> On 29 May 2015 23:52, "Igor Berman"  wrote:
>
>> Hi Ayan,
>> thanks for the response
>> I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only
>> core, might be I should?)
>> What do you mean by materialized? I can repartitionAndSort by key
>> daily-aggregation, however I'm not quite understand how it will help with
>> yesterdays block which needs to be loaded from file and it has no
>> connection to this repartition of daily block.
>>
>>
>> On 29 May 2015 at 01:51, ayan guha  wrote:
>>
>>> Which version of spark? In 1.4 window queries will show up for these
>>> kind of scenarios.
>>>
>>> 1 thing I can suggest is keep daily aggregates materialised and
>>> partioned by key and sorted by key-day combination using repartitionandsort
>>> method. It allows you to use custom partitioner and custom sorter.
>>>
>>> Best
>>> Ayan
>>> On 29 May 2015 03:31, "igor.berman"  wrote:
>>>
>>>> Hi,
>>>> I have a batch daily job that computes daily aggregate of several
>>>> counters
>>>> represented by some object.
>>>> After daily aggregation is done, I want to compute block of 3 days
>>>> aggregation(3,7,30 etc)
>>>> To do so I need to add new daily aggregation to the current block and
>>>> then
>>>> subtract from current block the daily aggregation of the last day
>>>> within the
>>>> current block(sliding window...)
>>>> I've implemented it with something like:
>>>>
>>>> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
>>>> All rdds are keyed by unique id(long). Each rdd is saved in avro files
>>>> after
>>>> the job finishes and loaded when job starts(on next day). baseBlockRdd
>>>> is
>>>> much larger than lastDay and newDay rdds(depends on the size of the
>>>> block)
>>>>
>>>> Unfortunately the performance is not satisfactory due to many shuffles(I
>>>> have parallelism etc) I was looking for the way to improve performance
>>>> somehow, to make sure that one task "joins" same local keys without
>>>> reshuffling baseBlockRdd(which is big) each time the job starts(see
>>>> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
>>>> so bottom line - how to join big rdd with smaller rdd without
>>>> reshuffling
>>>> big rdd over and over again?
>>>> As soon as I've saved this big rdd and reloaded it from disk I want that
>>>> every other rdd will be partitioned and collocated by the same
>>>> "partitioner"(which is absent for hadooprdd) ... somehow, so that only
>>>> small
>>>> rdds will be sent over network.
>>>>
>>>> Another idea I had  - somehow split baseBlock into 2 parts with filter
>>>> by
>>>> keys of small rdds and then join, however I'm not sure it's possible to
>>>> implement this filter without join.
>>>>
>>>> any ideas would be appreciated,
>>>> thanks in advance
>>>> Igor
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>


Re: Batch aggregation by sliding window + join

2015-05-29 Thread ayan guha
My point is if you keep daily aggregates already computed then you do not
reprocess raw data. But yuh you may decide to recompute last 3 days
everyday.
On 29 May 2015 23:52, "Igor Berman"  wrote:

> Hi Ayan,
> thanks for the response
> I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only
> core, might be I should?)
> What do you mean by materialized? I can repartitionAndSort by key
> daily-aggregation, however I'm not quite understand how it will help with
> yesterdays block which needs to be loaded from file and it has no
> connection to this repartition of daily block.
>
>
> On 29 May 2015 at 01:51, ayan guha  wrote:
>
>> Which version of spark? In 1.4 window queries will show up for these kind
>> of scenarios.
>>
>> 1 thing I can suggest is keep daily aggregates materialised and partioned
>> by key and sorted by key-day combination using repartitionandsort method.
>> It allows you to use custom partitioner and custom sorter.
>>
>> Best
>> Ayan
>> On 29 May 2015 03:31, "igor.berman"  wrote:
>>
>>> Hi,
>>> I have a batch daily job that computes daily aggregate of several
>>> counters
>>> represented by some object.
>>> After daily aggregation is done, I want to compute block of 3 days
>>> aggregation(3,7,30 etc)
>>> To do so I need to add new daily aggregation to the current block and
>>> then
>>> subtract from current block the daily aggregation of the last day within
>>> the
>>> current block(sliding window...)
>>> I've implemented it with something like:
>>>
>>> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
>>> All rdds are keyed by unique id(long). Each rdd is saved in avro files
>>> after
>>> the job finishes and loaded when job starts(on next day). baseBlockRdd is
>>> much larger than lastDay and newDay rdds(depends on the size of the
>>> block)
>>>
>>> Unfortunately the performance is not satisfactory due to many shuffles(I
>>> have parallelism etc) I was looking for the way to improve performance
>>> somehow, to make sure that one task "joins" same local keys without
>>> reshuffling baseBlockRdd(which is big) each time the job starts(see
>>> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
>>> so bottom line - how to join big rdd with smaller rdd without reshuffling
>>> big rdd over and over again?
>>> As soon as I've saved this big rdd and reloaded it from disk I want that
>>> every other rdd will be partitioned and collocated by the same
>>> "partitioner"(which is absent for hadooprdd) ... somehow, so that only
>>> small
>>> rdds will be sent over network.
>>>
>>> Another idea I had  - somehow split baseBlock into 2 parts with filter by
>>> keys of small rdds and then join, however I'm not sure it's possible to
>>> implement this filter without join.
>>>
>>> any ideas would be appreciated,
>>> thanks in advance
>>> Igor
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Re: Batch aggregation by sliding window + join

2015-05-29 Thread Igor Berman
Hi Ayan,
thanks for the response
I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only
core, might be I should?)
What do you mean by materialized? I can repartitionAndSort by key
daily-aggregation, however I'm not quite understand how it will help with
yesterdays block which needs to be loaded from file and it has no
connection to this repartition of daily block.


On 29 May 2015 at 01:51, ayan guha  wrote:

> Which version of spark? In 1.4 window queries will show up for these kind
> of scenarios.
>
> 1 thing I can suggest is keep daily aggregates materialised and partioned
> by key and sorted by key-day combination using repartitionandsort method.
> It allows you to use custom partitioner and custom sorter.
>
> Best
> Ayan
> On 29 May 2015 03:31, "igor.berman"  wrote:
>
>> Hi,
>> I have a batch daily job that computes daily aggregate of several counters
>> represented by some object.
>> After daily aggregation is done, I want to compute block of 3 days
>> aggregation(3,7,30 etc)
>> To do so I need to add new daily aggregation to the current block and then
>> subtract from current block the daily aggregation of the last day within
>> the
>> current block(sliding window...)
>> I've implemented it with something like:
>>
>> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
>> All rdds are keyed by unique id(long). Each rdd is saved in avro files
>> after
>> the job finishes and loaded when job starts(on next day). baseBlockRdd is
>> much larger than lastDay and newDay rdds(depends on the size of the block)
>>
>> Unfortunately the performance is not satisfactory due to many shuffles(I
>> have parallelism etc) I was looking for the way to improve performance
>> somehow, to make sure that one task "joins" same local keys without
>> reshuffling baseBlockRdd(which is big) each time the job starts(see
>> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
>> so bottom line - how to join big rdd with smaller rdd without reshuffling
>> big rdd over and over again?
>> As soon as I've saved this big rdd and reloaded it from disk I want that
>> every other rdd will be partitioned and collocated by the same
>> "partitioner"(which is absent for hadooprdd) ... somehow, so that only
>> small
>> rdds will be sent over network.
>>
>> Another idea I had  - somehow split baseBlock into 2 parts with filter by
>> keys of small rdds and then join, however I'm not sure it's possible to
>> implement this filter without join.
>>
>> any ideas would be appreciated,
>> thanks in advance
>> Igor
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: Batch aggregation by sliding window + join

2015-05-28 Thread ayan guha
Which version of spark? In 1.4 window queries will show up for these kind
of scenarios.

1 thing I can suggest is keep daily aggregates materialised and partioned
by key and sorted by key-day combination using repartitionandsort method.
It allows you to use custom partitioner and custom sorter.

Best
Ayan
On 29 May 2015 03:31, "igor.berman"  wrote:

> Hi,
> I have a batch daily job that computes daily aggregate of several counters
> represented by some object.
> After daily aggregation is done, I want to compute block of 3 days
> aggregation(3,7,30 etc)
> To do so I need to add new daily aggregation to the current block and then
> subtract from current block the daily aggregation of the last day within
> the
> current block(sliding window...)
> I've implemented it with something like:
>
> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
> All rdds are keyed by unique id(long). Each rdd is saved in avro files
> after
> the job finishes and loaded when job starts(on next day). baseBlockRdd is
> much larger than lastDay and newDay rdds(depends on the size of the block)
>
> Unfortunately the performance is not satisfactory due to many shuffles(I
> have parallelism etc) I was looking for the way to improve performance
> somehow, to make sure that one task "joins" same local keys without
> reshuffling baseBlockRdd(which is big) each time the job starts(see
> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
> so bottom line - how to join big rdd with smaller rdd without reshuffling
> big rdd over and over again?
> As soon as I've saved this big rdd and reloaded it from disk I want that
> every other rdd will be partitioned and collocated by the same
> "partitioner"(which is absent for hadooprdd) ... somehow, so that only
> small
> rdds will be sent over network.
>
> Another idea I had  - somehow split baseBlock into 2 parts with filter by
> keys of small rdds and then join, however I'm not sure it's possible to
> implement this filter without join.
>
> any ideas would be appreciated,
> thanks in advance
> Igor
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Batch aggregation by sliding window + join

2015-05-28 Thread igor.berman
Hi,
I have a batch daily job that computes daily aggregate of several counters
represented by some object.
After daily aggregation is done, I want to compute block of 3 days
aggregation(3,7,30 etc)
To do so I need to add new daily aggregation to the current block and then
subtract from current block the daily aggregation of the last day within the
current block(sliding window...)
I've implemented it with something like:
baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition)
All rdds are keyed by unique id(long). Each rdd is saved in avro files after
the job finishes and loaded when job starts(on next day). baseBlockRdd is
much larger than lastDay and newDay rdds(depends on the size of the block)

Unfortunately the performance is not satisfactory due to many shuffles(I
have parallelism etc) I was looking for the way to improve performance
somehow, to make sure that one task "joins" same local keys without
reshuffling baseBlockRdd(which is big) each time the job starts(see
https://spark-project.atlassian.net/browse/SPARK-1061 as related issue)
so bottom line - how to join big rdd with smaller rdd without reshuffling
big rdd over and over again?
As soon as I've saved this big rdd and reloaded it from disk I want that
every other rdd will be partitioned and collocated by the same
"partitioner"(which is absent for hadooprdd) ... somehow, so that only small
rdds will be sent over network.

Another idea I had  - somehow split baseBlock into 2 parts with filter by
keys of small rdds and then join, however I'm not sure it's possible to
implement this filter without join.

any ideas would be appreciated,
thanks in advance
Igor



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org