Re: Batch aggregation by sliding window + join
yes, I see now. In a case of 3 days it's indeed possible, however if I want to hold 30 days(or even bigger) block aggregation it will be a bit slow. for the sake of the history: I've found several directions that I can improve shuffling(from video https://www.youtube.com/watch?v=Wg2boMqLjCg) e.g. since I don't have cached rdds, I can try to increase spark.shuffle.memoryFraction from default 0.2 to something bigger(even 0.6) as for my initial question - there is PR that tries to solve this issue(not yet merged though) https://github.com/apache/spark/pull/4449 which introduces custom RDD with custom InputFormat(based on HadoopRDD), I'll try to do something similar. anyway thanks for ideas and help! On 29 May 2015 at 18:01, ayan guha wrote: > My point is if you keep daily aggregates already computed then you do not > reprocess raw data. But yuh you may decide to recompute last 3 days > everyday. > On 29 May 2015 23:52, "Igor Berman" wrote: > >> Hi Ayan, >> thanks for the response >> I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only >> core, might be I should?) >> What do you mean by materialized? I can repartitionAndSort by key >> daily-aggregation, however I'm not quite understand how it will help with >> yesterdays block which needs to be loaded from file and it has no >> connection to this repartition of daily block. >> >> >> On 29 May 2015 at 01:51, ayan guha wrote: >> >>> Which version of spark? In 1.4 window queries will show up for these >>> kind of scenarios. >>> >>> 1 thing I can suggest is keep daily aggregates materialised and >>> partioned by key and sorted by key-day combination using repartitionandsort >>> method. It allows you to use custom partitioner and custom sorter. >>> >>> Best >>> Ayan >>> On 29 May 2015 03:31, "igor.berman" wrote: >>> >>>> Hi, >>>> I have a batch daily job that computes daily aggregate of several >>>> counters >>>> represented by some object. >>>> After daily aggregation is done, I want to compute block of 3 days >>>> aggregation(3,7,30 etc) >>>> To do so I need to add new daily aggregation to the current block and >>>> then >>>> subtract from current block the daily aggregation of the last day >>>> within the >>>> current block(sliding window...) >>>> I've implemented it with something like: >>>> >>>> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition) >>>> All rdds are keyed by unique id(long). Each rdd is saved in avro files >>>> after >>>> the job finishes and loaded when job starts(on next day). baseBlockRdd >>>> is >>>> much larger than lastDay and newDay rdds(depends on the size of the >>>> block) >>>> >>>> Unfortunately the performance is not satisfactory due to many shuffles(I >>>> have parallelism etc) I was looking for the way to improve performance >>>> somehow, to make sure that one task "joins" same local keys without >>>> reshuffling baseBlockRdd(which is big) each time the job starts(see >>>> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue) >>>> so bottom line - how to join big rdd with smaller rdd without >>>> reshuffling >>>> big rdd over and over again? >>>> As soon as I've saved this big rdd and reloaded it from disk I want that >>>> every other rdd will be partitioned and collocated by the same >>>> "partitioner"(which is absent for hadooprdd) ... somehow, so that only >>>> small >>>> rdds will be sent over network. >>>> >>>> Another idea I had - somehow split baseBlock into 2 parts with filter >>>> by >>>> keys of small rdds and then join, however I'm not sure it's possible to >>>> implement this filter without join. >>>> >>>> any ideas would be appreciated, >>>> thanks in advance >>>> Igor >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html >>>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>>> >>>> - >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>>> >>
Re: Batch aggregation by sliding window + join
My point is if you keep daily aggregates already computed then you do not reprocess raw data. But yuh you may decide to recompute last 3 days everyday. On 29 May 2015 23:52, "Igor Berman" wrote: > Hi Ayan, > thanks for the response > I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only > core, might be I should?) > What do you mean by materialized? I can repartitionAndSort by key > daily-aggregation, however I'm not quite understand how it will help with > yesterdays block which needs to be loaded from file and it has no > connection to this repartition of daily block. > > > On 29 May 2015 at 01:51, ayan guha wrote: > >> Which version of spark? In 1.4 window queries will show up for these kind >> of scenarios. >> >> 1 thing I can suggest is keep daily aggregates materialised and partioned >> by key and sorted by key-day combination using repartitionandsort method. >> It allows you to use custom partitioner and custom sorter. >> >> Best >> Ayan >> On 29 May 2015 03:31, "igor.berman" wrote: >> >>> Hi, >>> I have a batch daily job that computes daily aggregate of several >>> counters >>> represented by some object. >>> After daily aggregation is done, I want to compute block of 3 days >>> aggregation(3,7,30 etc) >>> To do so I need to add new daily aggregation to the current block and >>> then >>> subtract from current block the daily aggregation of the last day within >>> the >>> current block(sliding window...) >>> I've implemented it with something like: >>> >>> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition) >>> All rdds are keyed by unique id(long). Each rdd is saved in avro files >>> after >>> the job finishes and loaded when job starts(on next day). baseBlockRdd is >>> much larger than lastDay and newDay rdds(depends on the size of the >>> block) >>> >>> Unfortunately the performance is not satisfactory due to many shuffles(I >>> have parallelism etc) I was looking for the way to improve performance >>> somehow, to make sure that one task "joins" same local keys without >>> reshuffling baseBlockRdd(which is big) each time the job starts(see >>> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue) >>> so bottom line - how to join big rdd with smaller rdd without reshuffling >>> big rdd over and over again? >>> As soon as I've saved this big rdd and reloaded it from disk I want that >>> every other rdd will be partitioned and collocated by the same >>> "partitioner"(which is absent for hadooprdd) ... somehow, so that only >>> small >>> rdds will be sent over network. >>> >>> Another idea I had - somehow split baseBlock into 2 parts with filter by >>> keys of small rdds and then join, however I'm not sure it's possible to >>> implement this filter without join. >>> >>> any ideas would be appreciated, >>> thanks in advance >>> Igor >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >
Re: Batch aggregation by sliding window + join
Hi Ayan, thanks for the response I'm using 1.3.1. I'll check window queries(I dont use spark-sql...only core, might be I should?) What do you mean by materialized? I can repartitionAndSort by key daily-aggregation, however I'm not quite understand how it will help with yesterdays block which needs to be loaded from file and it has no connection to this repartition of daily block. On 29 May 2015 at 01:51, ayan guha wrote: > Which version of spark? In 1.4 window queries will show up for these kind > of scenarios. > > 1 thing I can suggest is keep daily aggregates materialised and partioned > by key and sorted by key-day combination using repartitionandsort method. > It allows you to use custom partitioner and custom sorter. > > Best > Ayan > On 29 May 2015 03:31, "igor.berman" wrote: > >> Hi, >> I have a batch daily job that computes daily aggregate of several counters >> represented by some object. >> After daily aggregation is done, I want to compute block of 3 days >> aggregation(3,7,30 etc) >> To do so I need to add new daily aggregation to the current block and then >> subtract from current block the daily aggregation of the last day within >> the >> current block(sliding window...) >> I've implemented it with something like: >> >> baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition) >> All rdds are keyed by unique id(long). Each rdd is saved in avro files >> after >> the job finishes and loaded when job starts(on next day). baseBlockRdd is >> much larger than lastDay and newDay rdds(depends on the size of the block) >> >> Unfortunately the performance is not satisfactory due to many shuffles(I >> have parallelism etc) I was looking for the way to improve performance >> somehow, to make sure that one task "joins" same local keys without >> reshuffling baseBlockRdd(which is big) each time the job starts(see >> https://spark-project.atlassian.net/browse/SPARK-1061 as related issue) >> so bottom line - how to join big rdd with smaller rdd without reshuffling >> big rdd over and over again? >> As soon as I've saved this big rdd and reloaded it from disk I want that >> every other rdd will be partitioned and collocated by the same >> "partitioner"(which is absent for hadooprdd) ... somehow, so that only >> small >> rdds will be sent over network. >> >> Another idea I had - somehow split baseBlock into 2 parts with filter by >> keys of small rdds and then join, however I'm not sure it's possible to >> implement this filter without join. >> >> any ideas would be appreciated, >> thanks in advance >> Igor >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >>
Re: Batch aggregation by sliding window + join
Which version of spark? In 1.4 window queries will show up for these kind of scenarios. 1 thing I can suggest is keep daily aggregates materialised and partioned by key and sorted by key-day combination using repartitionandsort method. It allows you to use custom partitioner and custom sorter. Best Ayan On 29 May 2015 03:31, "igor.berman" wrote: > Hi, > I have a batch daily job that computes daily aggregate of several counters > represented by some object. > After daily aggregation is done, I want to compute block of 3 days > aggregation(3,7,30 etc) > To do so I need to add new daily aggregation to the current block and then > subtract from current block the daily aggregation of the last day within > the > current block(sliding window...) > I've implemented it with something like: > > baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition) > All rdds are keyed by unique id(long). Each rdd is saved in avro files > after > the job finishes and loaded when job starts(on next day). baseBlockRdd is > much larger than lastDay and newDay rdds(depends on the size of the block) > > Unfortunately the performance is not satisfactory due to many shuffles(I > have parallelism etc) I was looking for the way to improve performance > somehow, to make sure that one task "joins" same local keys without > reshuffling baseBlockRdd(which is big) each time the job starts(see > https://spark-project.atlassian.net/browse/SPARK-1061 as related issue) > so bottom line - how to join big rdd with smaller rdd without reshuffling > big rdd over and over again? > As soon as I've saved this big rdd and reloaded it from disk I want that > every other rdd will be partitioned and collocated by the same > "partitioner"(which is absent for hadooprdd) ... somehow, so that only > small > rdds will be sent over network. > > Another idea I had - somehow split baseBlock into 2 parts with filter by > keys of small rdds and then join, however I'm not sure it's possible to > implement this filter without join. > > any ideas would be appreciated, > thanks in advance > Igor > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Batch aggregation by sliding window + join
Hi, I have a batch daily job that computes daily aggregate of several counters represented by some object. After daily aggregation is done, I want to compute block of 3 days aggregation(3,7,30 etc) To do so I need to add new daily aggregation to the current block and then subtract from current block the daily aggregation of the last day within the current block(sliding window...) I've implemented it with something like: baseBlockRdd.leftjoin(lastDayRdd).map(subtraction).fullOuterJoin(newDayRdd).map(addition) All rdds are keyed by unique id(long). Each rdd is saved in avro files after the job finishes and loaded when job starts(on next day). baseBlockRdd is much larger than lastDay and newDay rdds(depends on the size of the block) Unfortunately the performance is not satisfactory due to many shuffles(I have parallelism etc) I was looking for the way to improve performance somehow, to make sure that one task "joins" same local keys without reshuffling baseBlockRdd(which is big) each time the job starts(see https://spark-project.atlassian.net/browse/SPARK-1061 as related issue) so bottom line - how to join big rdd with smaller rdd without reshuffling big rdd over and over again? As soon as I've saved this big rdd and reloaded it from disk I want that every other rdd will be partitioned and collocated by the same "partitioner"(which is absent for hadooprdd) ... somehow, so that only small rdds will be sent over network. Another idea I had - somehow split baseBlock into 2 parts with filter by keys of small rdds and then join, however I'm not sure it's possible to implement this filter without join. any ideas would be appreciated, thanks in advance Igor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Batch-aggregation-by-sliding-window-join-tp23074.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org