Re: Theta sketch - concurrent union implementation
I did some thinking and alternative 2 would not allow supporting a scenario of single-write-multiple-readers in druid's incremental index, which is the common case.So this leaves choosing between alternative 1 and 3.Can anyone point out advantages of having a union API to answer queries rather than a sketch? The only reason I can think of is being backward compatible with the current implementation, but this might be a good enough reason. On Sunday, June 30, 2019, 1:13:30 PM GMT+3, Eshcar Hillel wrote: Hi Everyone, As some of you may recall a year ago we had a conversation over the mailing list regarding the synchronization of sketches https://lists.apache.org/thread.html/9899aa790a7eb561ab66f47b35c8f66ffe695432719251351339521a@%3Cdev.druid.apache.org%3E.Currently, the implementation of concurrent theta sketch is committed to the datasketches library.Details of the design and API can be found here https://datasketches.github.io/docs/Theta/ConcurrentThetaSketch.html. We would like to continue with implementing a concurrent union operation. For this I have opened an issue suggesting 3 design alternativeshttps://github.com/apache/incubator-datasketches-java/issues/263. With Druid being one of the main users of data sketches, and specifically the union set operation, the input of the Druid community is valuable.The advantages of a concurrent union implementation is that it is thread safe, namely allows concurrent reads and updates of the union object. The application does not need to wrap the union implementation with a synchronized call as currently done in https://github.com/apache/incubator-druid/blob/master/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java.The core concept of a concurrent implementation is separating the object into local objects and shared object, where the data flows from local to shared.The 3 design alternative suggest different separation of read and write accesses:1) write only to local (union) read only from shared (union)2) write and read only from local (union)3) write only to local (union) read only from shared (sketch) I would greatly appreciate if you can give your feedback in the issue I opened https://github.com/apache/incubator-datasketches-java/issues/263 so we can make the best decision (also) for Druid. Thanks,Eshcar
Theta sketch - concurrent union implementation
Hi Everyone, As some of you may recall a year ago we had a conversation over the mailing list regarding the synchronization of sketches https://lists.apache.org/thread.html/9899aa790a7eb561ab66f47b35c8f66ffe695432719251351339521a@%3Cdev.druid.apache.org%3E.Currently, the implementation of concurrent theta sketch is committed to the datasketches library.Details of the design and API can be found here https://datasketches.github.io/docs/Theta/ConcurrentThetaSketch.html. We would like to continue with implementing a concurrent union operation. For this I have opened an issue suggesting 3 design alternativeshttps://github.com/apache/incubator-datasketches-java/issues/263. With Druid being one of the main users of data sketches, and specifically the union set operation, the input of the Druid community is valuable.The advantages of a concurrent union implementation is that it is thread safe, namely allows concurrent reads and updates of the union object. The application does not need to wrap the union implementation with a synchronized call as currently done in https://github.com/apache/incubator-druid/blob/master/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java.The core concept of a concurrent implementation is separating the object into local objects and shared object, where the data flows from local to shared.The 3 design alternative suggest different separation of read and write accesses:1) write only to local (union) read only from shared (union)2) write and read only from local (union)3) write only to local (union) read only from shared (sketch) I would greatly appreciate if you can give your feedback in the issue I opened https://github.com/apache/incubator-datasketches-java/issues/263 so we can make the best decision (also) for Druid. Thanks,Eshcar
Re: Question about sketches aggregation in druid
Thanks Himanshu! I will update when we have the ConcurrentUnion in the DataSketches library, or earlier if we get interesting performance results with the union implementations. On Tuesday, July 24, 2018, 8:39:25 PM GMT+3, Himanshu wrote: This came up in the dev sync today. Here is the gist. - Union is necessary because we merge sketches in multiple cases e.g. at query time, while persisting the final segment to be pushed to deep storage , indexing user's data that itself contains sketches (e.g. someone ran batch pipelines with Pig etc and created data that already has sketches in it). - SynchronizedUnion is necessary only because of realtime indexing and querying use case as Gian mentioned ( it is a single writer - multiple readers use case). If we have a ConcurrentUnion implementation that performs as good as non-thread-safe Union for single thread, then we should totally be able to remove SynchronizedUnion. -- Himanshu On Sun, Jul 22, 2018 at 9:10 AM, Eshcar Hillel wrote: > I think part of my confusion stems from the gap in the level of > abstraction.Druid terminology focuses on aggregation.But in sketches there > are two levels of aggregation:A sketch is a first level aggregation which > holds the gist of the stream, > A union is a second level aggregation which can aggregate sketches. > Adding 2 questions to the questions below: > The locks are used to synchronize the access to the union - I assume the > union is a second level aggregation merging sketches that are built during > ingestion. > 3) If this is not the case then why does druid apply a union and not > simply uses a sketch to aggregate the data?4) if it is the case then is it > guaranteed that the merged sketches are immutable? otherwise wrapping the > union with locks is not enough. > > I hope my questions make more sense now. > Thanks,Eshcar > > On Sunday, July 22, 2018, 4:15:02 PM GMT+3, Eshcar Hillel < > esh...@oath.com> wrote: > > Thanks Gian - I was missing the part about aggregation during ingestion > time roll-up. > I looked at the SketchAggregator code and read the druid overview document > let me verify that I got this right.Consider the best effort roll up mode: > as events arrive they are ingested into multiple segments, call these > s0...s9, but should belong to a single segment.Then a roll-up process ru > aggregates s0...s9 one-by-one (?) into a single segment. During the roll up > ru can be queried and therefore needs to be thread safe. > 1) who is the "owner" of the roll up process? what triggers the roll-up > thread? Is it considered as part of the ingestion/indexing time, or is it > done at the background as a kind of an optimization? > > 2) The documents says "Data is queryable as soon as it isingested by the > realtime processing logic." Does this means that queries can apply get to > s0..s9? should they be thread safe as well? > > > On Thursday, July 19, 2018, 10:16:34 PM GMT+3, Gian Merlino < > g...@apache.org> wrote: > > Hi Eshcar, > > I don't think I 100% understand what you are asking, but I will say some > things, and hopefully they will be helpful. > > In Druid we use aggregators for two things: aggregation during ingestion > (for ingestion-time rollup) and aggregation during queries. During queries > the aggregators are only ever used by one thread at a time. At ingestion > time, "aggregate" and "get" can be called simultaneously. It happens > because "aggregate" is called from an ingestion thread (because we update > running aggregators during ingestion), and "get" is called by query threads > (because they "get" those aggregator values from the ingestion aggregator > object to feed them to a query aggregator object). These calls are not > synchronized by Druid, so individual aggregators need to do it themselves > if necessary. There was some effort to address this systematically: > https://github.com/apache/incubator-druid/pull/3956, although it hasn't > been finished yet. Check out some of the discussion on that patch for more > background, and a question I just posted there: does it make more sense for > ingestion-time aggregator thread-safety to be handled systematically (at > the IncrementalIndex) or for each aggregator to need to be thread safe? > > If you're looking at "aggregate" and "get" in this file, those are the two > that could get called simultaneously: > https://github.com/apache/incubator-druid/blob/master/ > extensions-core/datasketches/src/main/java/io/druid/query/ > aggregation/datasketches/theta/SketchAggregator.java > > On Sun, Jul 15, 2018 at 12:11 AM Eshcar Hillel > wrote: > > > Apologies, I must be missing something very basic in how increment
Re: Question about sketches aggregation in druid
I think part of my confusion stems from the gap in the level of abstraction.Druid terminology focuses on aggregation.But in sketches there are two levels of aggregation:A sketch is a first level aggregation which holds the gist of the stream, A union is a second level aggregation which can aggregate sketches. Adding 2 questions to the questions below: The locks are used to synchronize the access to the union - I assume the union is a second level aggregation merging sketches that are built during ingestion. 3) If this is not the case then why does druid apply a union and not simply uses a sketch to aggregate the data?4) if it is the case then is it guaranteed that the merged sketches are immutable? otherwise wrapping the union with locks is not enough. I hope my questions make more sense now. Thanks,Eshcar On Sunday, July 22, 2018, 4:15:02 PM GMT+3, Eshcar Hillel wrote: Thanks Gian - I was missing the part about aggregation during ingestion time roll-up. I looked at the SketchAggregator code and read the druid overview document let me verify that I got this right.Consider the best effort roll up mode: as events arrive they are ingested into multiple segments, call these s0...s9, but should belong to a single segment.Then a roll-up process ru aggregates s0...s9 one-by-one (?) into a single segment. During the roll up ru can be queried and therefore needs to be thread safe. 1) who is the "owner" of the roll up process? what triggers the roll-up thread? Is it considered as part of the ingestion/indexing time, or is it done at the background as a kind of an optimization? 2) The documents says "Data is queryable as soon as it isingested by the realtime processing logic." Does this means that queries can apply get to s0..s9? should they be thread safe as well? On Thursday, July 19, 2018, 10:16:34 PM GMT+3, Gian Merlino wrote: Hi Eshcar, I don't think I 100% understand what you are asking, but I will say some things, and hopefully they will be helpful. In Druid we use aggregators for two things: aggregation during ingestion (for ingestion-time rollup) and aggregation during queries. During queries the aggregators are only ever used by one thread at a time. At ingestion time, "aggregate" and "get" can be called simultaneously. It happens because "aggregate" is called from an ingestion thread (because we update running aggregators during ingestion), and "get" is called by query threads (because they "get" those aggregator values from the ingestion aggregator object to feed them to a query aggregator object). These calls are not synchronized by Druid, so individual aggregators need to do it themselves if necessary. There was some effort to address this systematically: https://github.com/apache/incubator-druid/pull/3956, although it hasn't been finished yet. Check out some of the discussion on that patch for more background, and a question I just posted there: does it make more sense for ingestion-time aggregator thread-safety to be handled systematically (at the IncrementalIndex) or for each aggregator to need to be thread safe? If you're looking at "aggregate" and "get" in this file, those are the two that could get called simultaneously: https://github.com/apache/incubator-druid/blob/master/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java On Sun, Jul 15, 2018 at 12:11 AM Eshcar Hillel wrote: > Apologies, I must be missing something very basic in how incremental > indexing is working.A sketch is by itself an aggregator - it can absorb > millions of updates before it exceeds its space limit or is flushed to disk. > I assumed the ingestion thread aggregates data in multiple sketches in > parallel, then at query time a union operation is invoked to merge relevant > sketches based on the attributes of the query, and when the union is > completed its result is returned to the user. But in such scenario there is > no need to call get before the union is completed. > This means there is another scenario where union is used and can be > queried while in the process of executing the merge. Is this to maintain > some in-memory hierarchy of aggregations? or for creating the snapshots > that are flushed to disk? > A better understanding of the use case will help in presenting a better > thread-safe solution. > Thanks,Eshcar > > On Wednesday, July 11, 2018, 7:51:24 PM GMT+3, Gian Merlino < > g...@apache.org> wrote: > > Hi Eshcar, > > > But even in a single-writer-single-reader scenario removing the lock can > increase the throughput of accesses to the object. > > Definitely worth trying this out, imo. > > > However, I don't understand why is the union object read before the > result is ready. > > It's used a
Re: Question about sketches aggregation in druid
Thanks Gian - I was missing the part about aggregation during ingestion time roll-up. I looked at the SketchAggregator code and read the druid overview document let me verify that I got this right.Consider the best effort roll up mode: as events arrive they are ingested into multiple segments, call these s0...s9, but should belong to a single segment.Then a roll-up process ru aggregates s0...s9 one-by-one (?) into a single segment. During the roll up ru can be queried and therefore needs to be thread safe. 1) who is the "owner" of the roll up process? what triggers the roll-up thread? Is it considered as part of the ingestion/indexing time, or is it done at the background as a kind of an optimization? 2) The documents says "Data is queryable as soon as it isingested by the realtime processing logic." Does this means that queries can apply get to s0..s9? should they be thread safe as well? On Thursday, July 19, 2018, 10:16:34 PM GMT+3, Gian Merlino wrote: Hi Eshcar, I don't think I 100% understand what you are asking, but I will say some things, and hopefully they will be helpful. In Druid we use aggregators for two things: aggregation during ingestion (for ingestion-time rollup) and aggregation during queries. During queries the aggregators are only ever used by one thread at a time. At ingestion time, "aggregate" and "get" can be called simultaneously. It happens because "aggregate" is called from an ingestion thread (because we update running aggregators during ingestion), and "get" is called by query threads (because they "get" those aggregator values from the ingestion aggregator object to feed them to a query aggregator object). These calls are not synchronized by Druid, so individual aggregators need to do it themselves if necessary. There was some effort to address this systematically: https://github.com/apache/incubator-druid/pull/3956, although it hasn't been finished yet. Check out some of the discussion on that patch for more background, and a question I just posted there: does it make more sense for ingestion-time aggregator thread-safety to be handled systematically (at the IncrementalIndex) or for each aggregator to need to be thread safe? If you're looking at "aggregate" and "get" in this file, those are the two that could get called simultaneously: https://github.com/apache/incubator-druid/blob/master/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java On Sun, Jul 15, 2018 at 12:11 AM Eshcar Hillel wrote: > Apologies, I must be missing something very basic in how incremental > indexing is working.A sketch is by itself an aggregator - it can absorb > millions of updates before it exceeds its space limit or is flushed to disk. > I assumed the ingestion thread aggregates data in multiple sketches in > parallel, then at query time a union operation is invoked to merge relevant > sketches based on the attributes of the query, and when the union is > completed its result is returned to the user. But in such scenario there is > no need to call get before the union is completed. > This means there is another scenario where union is used and can be > queried while in the process of executing the merge. Is this to maintain > some in-memory hierarchy of aggregations? or for creating the snapshots > that are flushed to disk? > A better understanding of the use case will help in presenting a better > thread-safe solution. > Thanks,Eshcar > > On Wednesday, July 11, 2018, 7:51:24 PM GMT+3, Gian Merlino < > g...@apache.org> wrote: > > Hi Eshcar, > > > But even in a single-writer-single-reader scenario removing the lock can > increase the throughput of accesses to the object. > > Definitely worth trying this out, imo. > > > However, I don't understand why is the union object read before the > result is ready. > > It's used as part of incremental indexing: the idea is that we create > aggregates during ingestion time and we want those to be queryable even > while ingestion is still ongoing. So the ingestion thread will be calling > "aggregate" and a query thread will be calling "get" potentially > simultaneously. > > On Wed, Jul 11, 2018 at 1:04 AM Eshcar Hillel > wrote: > > > Thanks Gian, > > This is also my understanding.But even in a single-writer-single-reader > > scenario removing the lock can increase the throughput of accesses to the > > object. > > If the union is only used to produce the result at query time then > > removing the lock would not affect ingestion throughput, but could > decrease > > query latency.However, I don't understand why is the union object read > > before the result is ready. > > On Tue
Re: Question about sketches aggregation in druid
Hi Himanshu, We did not experiment with the union operation, but these are toy-example throughput numbers we got for a *single* thread writing to a theta sketch: basic (sketches-core) 167M, lock-based 32M, our concurrent 115M (223M with 2 threads, and scaling linearly).The experiments were done in a single sketch environment, and the results might be different with multiple sketches, however it captures the potential gain of removing the lock. Thanks,Eshcar On Wednesday, July 11, 2018, 2:26:53 AM GMT+3, Himanshu wrote: Yes, SynchronizedUnion was added on seeing concurrency error on the processes doing realtime ingestion. On other processes, it is not called concurrently and hope is that synchronization does not cause overhead when that piece of code is only called in one thread because it gets optimized. On Tue, Jul 10, 2018 at 10:13 AM, Gian Merlino wrote: > Hi Eshcar, > > To my knowledge, in the Druid Aggregator and BufferAggregator interfaces, > the main place where concurrency happens is that "aggregate" and "get" may > be called simultaneously during realtime ingestion. So if there would be a > benefit from improving concurrency it would probably end up in that area. > > On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel > wrote: > > > Hi All, > > My name is Eshcar Hillel from Oath research. I'm currently working with > > Lee Rhodes on committing a new concurrent implementation of the theta > > sketch to the sketches-core library.I was wondering whether this > > implementation can help boost the union operation that is applied to > > multiple sketches at query time in druid.From what I see in the code the > > sketch aggregator uses the SynchronizedUnion implementation, which > > basically uses a lock at every single access (update/read) of the union > > operation. We believe a thread-safe implementation of the union operation > > can help decrease the inherent overhead of the lock. > > I will be happy to join the meeting today and briefly discuss this > option. > > Thanks,Eshcar > > > > > > >
Re: Question about sketches aggregation in druid
Apologies, I must be missing something very basic in how incremental indexing is working.A sketch is by itself an aggregator - it can absorb millions of updates before it exceeds its space limit or is flushed to disk. I assumed the ingestion thread aggregates data in multiple sketches in parallel, then at query time a union operation is invoked to merge relevant sketches based on the attributes of the query, and when the union is completed its result is returned to the user. But in such scenario there is no need to call get before the union is completed. This means there is another scenario where union is used and can be queried while in the process of executing the merge. Is this to maintain some in-memory hierarchy of aggregations? or for creating the snapshots that are flushed to disk? A better understanding of the use case will help in presenting a better thread-safe solution. Thanks,Eshcar On Wednesday, July 11, 2018, 7:51:24 PM GMT+3, Gian Merlino wrote: Hi Eshcar, > But even in a single-writer-single-reader scenario removing the lock can increase the throughput of accesses to the object. Definitely worth trying this out, imo. > However, I don't understand why is the union object read before the result is ready. It's used as part of incremental indexing: the idea is that we create aggregates during ingestion time and we want those to be queryable even while ingestion is still ongoing. So the ingestion thread will be calling "aggregate" and a query thread will be calling "get" potentially simultaneously. On Wed, Jul 11, 2018 at 1:04 AM Eshcar Hillel wrote: > Thanks Gian, > This is also my understanding.But even in a single-writer-single-reader > scenario removing the lock can increase the throughput of accesses to the > object. > If the union is only used to produce the result at query time then > removing the lock would not affect ingestion throughput, but could decrease > query latency.However, I don't understand why is the union object read > before the result is ready. > On Tuesday, July 10, 2018, 8:13:36 PM GMT+3, Gian Merlino < > g...@apache.org> wrote: > > Hi Eshcar, > > To my knowledge, in the Druid Aggregator and BufferAggregator interfaces, > the main place where concurrency happens is that "aggregate" and "get" may > be called simultaneously during realtime ingestion. So if there would be a > benefit from improving concurrency it would probably end up in that area. > > On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel > wrote: > > > Hi All, > > My name is Eshcar Hillel from Oath research. I'm currently working with > > Lee Rhodes on committing a new concurrent implementation of the theta > > sketch to the sketches-core library.I was wondering whether this > > implementation can help boost the union operation that is applied to > > multiple sketches at query time in druid.From what I see in the code the > > sketch aggregator uses the SynchronizedUnion implementation, which > > basically uses a lock at every single access (update/read) of the union > > operation. We believe a thread-safe implementation of the union operation > > can help decrease the inherent overhead of the lock. > > I will be happy to join the meeting today and briefly discuss this > option. > > Thanks,Eshcar > > > > > > >
Question about sketches aggregation in druid
Hi All, My name is Eshcar Hillel from Oath research. I'm currently working with Lee Rhodes on committing a new concurrent implementation of the theta sketch to the sketches-core library.I was wondering whether this implementation can help boost the union operation that is applied to multiple sketches at query time in druid.From what I see in the code the sketch aggregator uses the SynchronizedUnion implementation, which basically uses a lock at every single access (update/read) of the union operation. We believe a thread-safe implementation of the union operation can help decrease the inherent overhead of the lock. I will be happy to join the meeting today and briefly discuss this option. Thanks,Eshcar