Re: Theta sketch - concurrent union implementation

2019-07-02 Thread Eshcar Hillel
 I did some thinking and alternative 2 would not allow supporting a scenario of 
single-write-multiple-readers in druid's incremental index, which is the common 
case.So this leaves choosing between alternative 1 and 3.Can anyone point out 
advantages of having a union API to answer queries rather than a sketch? The 
only reason I can think of is being backward compatible with the current 
implementation, but this might be a good enough reason. On Sunday, June 30, 
2019, 1:13:30 PM GMT+3, Eshcar Hillel  wrote:  
 
 Hi Everyone,
As some of you may recall a year ago we had a conversation over the mailing 
list regarding the synchronization of sketches 
https://lists.apache.org/thread.html/9899aa790a7eb561ab66f47b35c8f66ffe695432719251351339521a@%3Cdev.druid.apache.org%3E.Currently,
 the implementation of concurrent theta sketch is committed to the datasketches 
library.Details of the design and API can be found here 
https://datasketches.github.io/docs/Theta/ConcurrentThetaSketch.html.
We would like to continue with implementing a concurrent union operation. For 
this I have opened an issue suggesting 3 design 
alternativeshttps://github.com/apache/incubator-datasketches-java/issues/263.

With Druid being one of the main users of data sketches, and specifically the 
union set operation, the input of the Druid community is valuable.The 
advantages of a concurrent union implementation is that it is thread safe, 
namely allows concurrent reads and updates of the union object. The application 
does not need to wrap the union implementation with a synchronized call as 
currently done in 
https://github.com/apache/incubator-druid/blob/master/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java.The
 core concept of a concurrent implementation is separating the object into 
local objects and shared object, where the data flows from local to shared.The 
3 design alternative suggest different separation of read and write accesses:1) 
write only to local (union) read only from shared (union)2) write and read only 
from local (union)3) write only to local (union) read only from shared (sketch)
I would greatly appreciate if you can give your feedback in the issue I opened 
https://github.com/apache/incubator-datasketches-java/issues/263 so we can make 
the best decision (also) for Druid.
Thanks,Eshcar  

Theta sketch - concurrent union implementation

2019-06-30 Thread Eshcar Hillel
Hi Everyone,
As some of you may recall a year ago we had a conversation over the mailing 
list regarding the synchronization of sketches 
https://lists.apache.org/thread.html/9899aa790a7eb561ab66f47b35c8f66ffe695432719251351339521a@%3Cdev.druid.apache.org%3E.Currently,
 the implementation of concurrent theta sketch is committed to the datasketches 
library.Details of the design and API can be found here 
https://datasketches.github.io/docs/Theta/ConcurrentThetaSketch.html.
We would like to continue with implementing a concurrent union operation. For 
this I have opened an issue suggesting 3 design 
alternativeshttps://github.com/apache/incubator-datasketches-java/issues/263.

With Druid being one of the main users of data sketches, and specifically the 
union set operation, the input of the Druid community is valuable.The 
advantages of a concurrent union implementation is that it is thread safe, 
namely allows concurrent reads and updates of the union object. The application 
does not need to wrap the union implementation with a synchronized call as 
currently done in 
https://github.com/apache/incubator-druid/blob/master/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregator.java.The
 core concept of a concurrent implementation is separating the object into 
local objects and shared object, where the data flows from local to shared.The 
3 design alternative suggest different separation of read and write accesses:1) 
write only to local (union) read only from shared (union)2) write and read only 
from local (union)3) write only to local (union) read only from shared (sketch)
I would greatly appreciate if you can give your feedback in the issue I opened 
https://github.com/apache/incubator-datasketches-java/issues/263 so we can make 
the best decision (also) for Druid.
Thanks,Eshcar

Re: Question about sketches aggregation in druid

2018-07-24 Thread Eshcar Hillel
 Thanks Himanshu!
I will update when we have the ConcurrentUnion in the DataSketches library, or 
earlier if we get interesting performance results with the union 
implementations.
On Tuesday, July 24, 2018, 8:39:25 PM GMT+3, Himanshu 
 wrote:  
 
 This came up in the dev sync today.

Here is the gist.

- Union is necessary because we merge sketches in multiple cases e.g. at
query time, while persisting the final segment to be pushed to deep storage
, indexing user's data that itself contains sketches (e.g. someone ran
batch pipelines with Pig etc and created data that already has sketches in
it).
- SynchronizedUnion is necessary only because of realtime indexing and
querying use case as Gian mentioned ( it is a single writer - multiple
readers use case). If we have a ConcurrentUnion implementation that
performs as good as non-thread-safe Union for single thread, then we should
totally be able to remove SynchronizedUnion.

-- Himanshu

On Sun, Jul 22, 2018 at 9:10 AM, Eshcar Hillel 
wrote:

>  I think part of my confusion stems from the gap in the level of
> abstraction.Druid terminology focuses on aggregation.But in sketches there
> are two levels of aggregation:A sketch is a first level aggregation which
> holds the gist of the stream,
> A union is a second level aggregation which can aggregate sketches.
> Adding 2 questions to the questions below:
> The locks are used to synchronize the access to the union - I assume the
> union is a second level aggregation merging sketches that are built during
> ingestion.
> 3) If this is not the case then why does druid apply a union and not
> simply uses a sketch to aggregate the data?4) if it is the case then is it
> guaranteed that the merged sketches are immutable? otherwise wrapping the
> union with locks is not enough.
>
> I hope my questions make more sense now.
> Thanks,Eshcar
>
>    On Sunday, July 22, 2018, 4:15:02 PM GMT+3, Eshcar Hillel <
> esh...@oath.com> wrote:
>
>  Thanks Gian - I was missing the part about aggregation during ingestion
> time roll-up.
> I looked at the SketchAggregator code and read the druid overview document
> let me verify that I got this right.Consider the best effort roll up mode:
> as events arrive they are ingested into multiple segments, call these
> s0...s9, but should belong to a single segment.Then a roll-up process ru
> aggregates s0...s9 one-by-one (?) into a single segment. During the roll up
> ru can be queried and therefore needs to be thread safe.
> 1) who is the "owner" of the roll up process? what triggers the roll-up
> thread? Is it considered as part of the ingestion/indexing time, or is it
> done at the background as a kind of an optimization?
>
> 2) The documents says "Data is queryable as soon as it isingested by the
> realtime processing logic." Does this means that queries can apply get to
> s0..s9? should they be thread safe as well?
>
>
>    On Thursday, July 19, 2018, 10:16:34 PM GMT+3, Gian Merlino <
> g...@apache.org> wrote:
>
>  Hi Eshcar,
>
> I don't think I 100% understand what you are asking, but I will say some
> things, and hopefully they will be helpful.
>
> In Druid we use aggregators for two things: aggregation during ingestion
> (for ingestion-time rollup) and aggregation during queries. During queries
> the aggregators are only ever used by one thread at a time. At ingestion
> time, "aggregate" and "get" can be called simultaneously. It happens
> because "aggregate" is called from an ingestion thread (because we update
> running aggregators during ingestion), and "get" is called by query threads
> (because they "get" those aggregator values from the ingestion aggregator
> object to feed them to a query aggregator object). These calls are not
> synchronized by Druid, so individual aggregators need to do it themselves
> if necessary. There was some effort to address this systematically:
> https://github.com/apache/incubator-druid/pull/3956, although it hasn't
> been finished yet. Check out some of the discussion on that patch for more
> background, and a question I just posted there: does it make more sense for
> ingestion-time aggregator thread-safety to be handled systematically (at
> the IncrementalIndex) or for each aggregator to need to be thread safe?
>
> If you're looking at "aggregate" and "get" in this file, those are the two
> that could get called simultaneously:
> https://github.com/apache/incubator-druid/blob/master/
> extensions-core/datasketches/src/main/java/io/druid/query/
> aggregation/datasketches/theta/SketchAggregator.java
>
> On Sun, Jul 15, 2018 at 12:11 AM Eshcar Hillel 
> wrote:
>
> >  Apologies, I must be missing something very basic in how increment

Re: Question about sketches aggregation in druid

2018-07-22 Thread Eshcar Hillel
 I think part of my confusion stems from the gap in the level of 
abstraction.Druid terminology focuses on aggregation.But in sketches there are 
two levels of aggregation:A sketch is a first level aggregation which holds the 
gist of the stream, 
A union is a second level aggregation which can aggregate sketches.
Adding 2 questions to the questions below:
The locks are used to synchronize the access to the union - I assume the union 
is a second level aggregation merging sketches that are built during ingestion. 
3) If this is not the case then why does druid apply a union and not simply 
uses a sketch to aggregate the data?4) if it is the case then is it guaranteed 
that the merged sketches are immutable? otherwise wrapping the union with locks 
is not enough. 

I hope my questions make more sense now.
Thanks,Eshcar

On Sunday, July 22, 2018, 4:15:02 PM GMT+3, Eshcar Hillel  
wrote:  
 
  Thanks Gian - I was missing the part about aggregation during ingestion time 
roll-up.
I looked at the SketchAggregator code and read the druid overview document let 
me verify that I got this right.Consider the best effort roll up mode: as 
events arrive they are ingested into multiple segments, call these s0...s9, but 
should belong to a single segment.Then a roll-up process ru aggregates s0...s9 
one-by-one (?) into a single segment. During the roll up ru can be queried and 
therefore needs to be thread safe.
1) who is the "owner" of the roll up process? what triggers the roll-up thread? 
Is it considered as part of the ingestion/indexing time, or is it done at the 
background as a kind of an optimization?

2) The documents says "Data is queryable as soon as it isingested by the 
realtime processing logic." Does this means that queries can apply get to 
s0..s9? should they be thread safe as well?
 

On Thursday, July 19, 2018, 10:16:34 PM GMT+3, Gian Merlino 
 wrote:  
 
 Hi Eshcar,

I don't think I 100% understand what you are asking, but I will say some
things, and hopefully they will be helpful.

In Druid we use aggregators for two things: aggregation during ingestion
(for ingestion-time rollup) and aggregation during queries. During queries
the aggregators are only ever used by one thread at a time. At ingestion
time, "aggregate" and "get" can be called simultaneously. It happens
because "aggregate" is called from an ingestion thread (because we update
running aggregators during ingestion), and "get" is called by query threads
(because they "get" those aggregator values from the ingestion aggregator
object to feed them to a query aggregator object). These calls are not
synchronized by Druid, so individual aggregators need to do it themselves
if necessary. There was some effort to address this systematically:
https://github.com/apache/incubator-druid/pull/3956, although it hasn't
been finished yet. Check out some of the discussion on that patch for more
background, and a question I just posted there: does it make more sense for
ingestion-time aggregator thread-safety to be handled systematically (at
the IncrementalIndex) or for each aggregator to need to be thread safe?

If you're looking at "aggregate" and "get" in this file, those are the two
that could get called simultaneously:
https://github.com/apache/incubator-druid/blob/master/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java

On Sun, Jul 15, 2018 at 12:11 AM Eshcar Hillel 
wrote:

>  Apologies, I must be missing something very basic in how incremental
> indexing is working.A sketch is by itself an aggregator - it can absorb
> millions of updates before it exceeds its space limit or is flushed to disk.
> I assumed the ingestion thread aggregates data in multiple sketches in
> parallel, then at query time a union operation is invoked to merge relevant
> sketches based on the attributes of the query, and when the union is
> completed its result is returned to the user. But in such scenario there is
> no need to call get before the union is completed.
> This means there is another scenario where union is used and can be
> queried while in the process of executing the merge. Is this to maintain
> some in-memory hierarchy of aggregations? or for creating the snapshots
> that are flushed to disk?
> A better understanding of the use case will help in presenting a better
> thread-safe solution.
> Thanks,Eshcar
>
>    On Wednesday, July 11, 2018, 7:51:24 PM GMT+3, Gian Merlino <
> g...@apache.org> wrote:
>
>  Hi Eshcar,
>
> > But even in a single-writer-single-reader scenario removing the lock can
> increase the throughput of accesses to the object.
>
> Definitely worth trying this out, imo.
>
> > However, I don't understand why is the union object read before the
> result is ready.
>
> It's used a

Re: Question about sketches aggregation in druid

2018-07-22 Thread Eshcar Hillel
 Thanks Gian - I was missing the part about aggregation during ingestion time 
roll-up.
I looked at the SketchAggregator code and read the druid overview document let 
me verify that I got this right.Consider the best effort roll up mode: as 
events arrive they are ingested into multiple segments, call these s0...s9, but 
should belong to a single segment.Then a roll-up process ru aggregates s0...s9 
one-by-one (?) into a single segment. During the roll up ru can be queried and 
therefore needs to be thread safe.
1) who is the "owner" of the roll up process? what triggers the roll-up thread? 
Is it considered as part of the ingestion/indexing time, or is it done at the 
background as a kind of an optimization?

2) The documents says "Data is queryable as soon as it isingested by the 
realtime processing logic." Does this means that queries can apply get to 
s0..s9? should they be thread safe as well?
 

On Thursday, July 19, 2018, 10:16:34 PM GMT+3, Gian Merlino 
 wrote:  
 
 Hi Eshcar,

I don't think I 100% understand what you are asking, but I will say some
things, and hopefully they will be helpful.

In Druid we use aggregators for two things: aggregation during ingestion
(for ingestion-time rollup) and aggregation during queries. During queries
the aggregators are only ever used by one thread at a time. At ingestion
time, "aggregate" and "get" can be called simultaneously. It happens
because "aggregate" is called from an ingestion thread (because we update
running aggregators during ingestion), and "get" is called by query threads
(because they "get" those aggregator values from the ingestion aggregator
object to feed them to a query aggregator object). These calls are not
synchronized by Druid, so individual aggregators need to do it themselves
if necessary. There was some effort to address this systematically:
https://github.com/apache/incubator-druid/pull/3956, although it hasn't
been finished yet. Check out some of the discussion on that patch for more
background, and a question I just posted there: does it make more sense for
ingestion-time aggregator thread-safety to be handled systematically (at
the IncrementalIndex) or for each aggregator to need to be thread safe?

If you're looking at "aggregate" and "get" in this file, those are the two
that could get called simultaneously:
https://github.com/apache/incubator-druid/blob/master/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java

On Sun, Jul 15, 2018 at 12:11 AM Eshcar Hillel 
wrote:

>  Apologies, I must be missing something very basic in how incremental
> indexing is working.A sketch is by itself an aggregator - it can absorb
> millions of updates before it exceeds its space limit or is flushed to disk.
> I assumed the ingestion thread aggregates data in multiple sketches in
> parallel, then at query time a union operation is invoked to merge relevant
> sketches based on the attributes of the query, and when the union is
> completed its result is returned to the user. But in such scenario there is
> no need to call get before the union is completed.
> This means there is another scenario where union is used and can be
> queried while in the process of executing the merge. Is this to maintain
> some in-memory hierarchy of aggregations? or for creating the snapshots
> that are flushed to disk?
> A better understanding of the use case will help in presenting a better
> thread-safe solution.
> Thanks,Eshcar
>
>    On Wednesday, July 11, 2018, 7:51:24 PM GMT+3, Gian Merlino <
> g...@apache.org> wrote:
>
>  Hi Eshcar,
>
> > But even in a single-writer-single-reader scenario removing the lock can
> increase the throughput of accesses to the object.
>
> Definitely worth trying this out, imo.
>
> > However, I don't understand why is the union object read before the
> result is ready.
>
> It's used as part of incremental indexing: the idea is that we create
> aggregates during ingestion time and we want those to be queryable even
> while ingestion is still ongoing. So the ingestion thread will be calling
> "aggregate" and a query thread will be calling "get" potentially
> simultaneously.
>
> On Wed, Jul 11, 2018 at 1:04 AM Eshcar Hillel 
> wrote:
>
> >  Thanks Gian,
> > This is also my understanding.But even in a single-writer-single-reader
> > scenario removing the lock can increase the throughput of accesses to the
> > object.
> > If the union is only used to produce the result at query time then
> > removing the lock would not affect ingestion throughput, but could
> decrease
> > query latency.However, I don't understand why is the union object read
> > before the result is ready.
> >    On Tue

Re: Question about sketches aggregation in druid

2018-07-15 Thread Eshcar Hillel
 Hi Himanshu,
We did not experiment with the union operation, but these are toy-example 
throughput numbers we got for a *single* thread writing to a theta sketch:
basic (sketches-core) 167M, 
lock-based 32M, 
our concurrent 115M  (223M with 2 threads, and scaling linearly).The 
experiments were done in a single sketch environment, and the results might be 
different with multiple sketches, however it captures the potential gain of 
removing the lock.
Thanks,Eshcar

On Wednesday, July 11, 2018, 2:26:53 AM GMT+3, Himanshu 
 wrote:  
 
 Yes, SynchronizedUnion was added on seeing concurrency error on the
processes doing realtime ingestion. On other processes, it is not called
concurrently and hope is that synchronization does not cause overhead when
that piece of code is only called in one thread because it gets optimized.

On Tue, Jul 10, 2018 at 10:13 AM, Gian Merlino  wrote:

> Hi Eshcar,
>
> To my knowledge, in the Druid Aggregator and BufferAggregator interfaces,
> the main place where concurrency happens is that "aggregate" and "get" may
> be called simultaneously during realtime ingestion. So if there would be a
> benefit from improving concurrency it would probably end up in that area.
>
> On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel 
> wrote:
>
> > Hi All,
> > My name is Eshcar Hillel from Oath research. I'm currently working with
> > Lee Rhodes on committing a new concurrent implementation of the theta
> > sketch to the sketches-core library.I was wondering whether this
> > implementation can help boost the union operation that is applied to
> > multiple sketches at query time in druid.From what I see in the code the
> > sketch aggregator uses the SynchronizedUnion implementation, which
> > basically uses a lock at every single access (update/read) of the union
> > operation. We believe a thread-safe implementation of the union operation
> > can help decrease the inherent overhead of the lock.
> > I will be happy to join the meeting today and briefly discuss this
> option.
> > Thanks,Eshcar
> >
> >
> >
>
  

Re: Question about sketches aggregation in druid

2018-07-15 Thread Eshcar Hillel
 Apologies, I must be missing something very basic in how incremental indexing 
is working.A sketch is by itself an aggregator - it can absorb millions of 
updates before it exceeds its space limit or is flushed to disk.
I assumed the ingestion thread aggregates data in multiple sketches in 
parallel, then at query time a union operation is invoked to merge relevant 
sketches based on the attributes of the query, and when the union is completed 
its result is returned to the user. But in such scenario there is no need to 
call get before the union is completed.
This means there is another scenario where union is used and can be queried 
while in the process of executing the merge. Is this to maintain some in-memory 
hierarchy of aggregations? or for creating the snapshots that are flushed to 
disk?
A better understanding of the use case will help in presenting a better 
thread-safe solution.
Thanks,Eshcar

On Wednesday, July 11, 2018, 7:51:24 PM GMT+3, Gian Merlino 
 wrote:  
 
 Hi Eshcar,

> But even in a single-writer-single-reader scenario removing the lock can
increase the throughput of accesses to the object.

Definitely worth trying this out, imo.

> However, I don't understand why is the union object read before the
result is ready.

It's used as part of incremental indexing: the idea is that we create
aggregates during ingestion time and we want those to be queryable even
while ingestion is still ongoing. So the ingestion thread will be calling
"aggregate" and a query thread will be calling "get" potentially
simultaneously.

On Wed, Jul 11, 2018 at 1:04 AM Eshcar Hillel 
wrote:

>  Thanks Gian,
> This is also my understanding.But even in a single-writer-single-reader
> scenario removing the lock can increase the throughput of accesses to the
> object.
> If the union is only used to produce the result at query time then
> removing the lock would not affect ingestion throughput, but could decrease
> query latency.However, I don't understand why is the union object read
> before the result is ready.
>    On Tuesday, July 10, 2018, 8:13:36 PM GMT+3, Gian Merlino <
> g...@apache.org> wrote:
>
>  Hi Eshcar,
>
> To my knowledge, in the Druid Aggregator and BufferAggregator interfaces,
> the main place where concurrency happens is that "aggregate" and "get" may
> be called simultaneously during realtime ingestion. So if there would be a
> benefit from improving concurrency it would probably end up in that area.
>
> On Tue, Jul 10, 2018 at 2:10 AM Eshcar Hillel 
> wrote:
>
> > Hi All,
> > My name is Eshcar Hillel from Oath research. I'm currently working with
> > Lee Rhodes on committing a new concurrent implementation of the theta
> > sketch to the sketches-core library.I was wondering whether this
> > implementation can help boost the union operation that is applied to
> > multiple sketches at query time in druid.From what I see in the code the
> > sketch aggregator uses the SynchronizedUnion implementation, which
> > basically uses a lock at every single access (update/read) of the union
> > operation. We believe a thread-safe implementation of the union operation
> > can help decrease the inherent overhead of the lock.
> > I will be happy to join the meeting today and briefly discuss this
> option.
> > Thanks,Eshcar
> >
> >
> >
>
  

Question about sketches aggregation in druid

2018-07-10 Thread Eshcar Hillel
Hi All,
My name is Eshcar Hillel from Oath research. I'm currently working with Lee 
Rhodes on committing a new concurrent implementation of the theta sketch to the 
sketches-core library.I was wondering whether this implementation can help 
boost the union operation that is applied to multiple sketches at query time in 
druid.From what I see in the code the sketch aggregator uses the 
SynchronizedUnion implementation, which basically uses a lock at every single 
access (update/read) of the union operation. We believe a thread-safe 
implementation of the union operation can help decrease the inherent overhead 
of the lock.
I will be happy to join the meeting today and briefly discuss this option.
Thanks,Eshcar