Re: [Design discussion] - Kudu Input operator

2017-08-24 Thread Ananth G
Hello Thomas,

Thanks for the additional comments. Replies inline marked [Ananth]>>>

Regards,
Ananth
> On 22 Aug 2017, at 2:10 pm, Thomas Weise  wrote:
> 
> -->
> 
> Thanks,
> Thomas
> 
> 
> On Sat, Aug 19, 2017 at 2:07 PM, Ananth G  > wrote:
> 
>> Hello Thomas,
>> 
>> Replies in line marked [Ananth]>>
>> 
>> Apologies for a little bit more longer description as I think the
>> description needs more clarity.
>> 
>> Regards,
>> Ananth
>> 
>>> On 19 Aug 2017, at 11:10 am, Thomas Weise  wrote:
>>> 
>>> Hi Ananth,
>>> 
>>> Nice writeup, couple questions/comments inline ->
>>> 
>>> On Tue, Aug 15, 2017 at 2:02 PM, Ananth G > >> wrote:
>>> 
 Hello All,
 
 The implementation for Apex Kudu Input Operator is ready for a pull
 request. Before raising the pull request, I would like to get any inputs
 regarding the design and incorporate any feedback before raising the
>> pull
 request in the next couple of days for the following JIRA.
 
 https://issues.apache.org/jira/browse/APEXMALHAR-2472 
  <
>> https://issues.apache.org/jira/browse/APEXMALHAR-2472 
>> > <
 https://issues.apache.org/jira/browse/APEXMALHAR-2472 <
>> https://issues.apache.org/jira/browse/APEXMALHAR-2472>>
 
 The following are the main features that would be supported by the Input
 operator:
 
 - The input operator would be used to scan all or some rows of a single
 kudu table.
 - Each Kudu row is translated to a POJO for downstream operators.
 - The Input operator would accept an SQL expression ( described in
>> detail
 below) that would be parsed to generate the equivalent scanner code for
>> the
 Kudu Table. This is because Kudu Table API does not support an SQL
 expressions
 - The SQL expression would have additional options that would help in
 Apache Apex design patterns ( Ex: Sending a control tuple message after
>> a
 query is successfully processed )
 - The Input operator works on a continuous basis i.e. it would accept
>> the
 next query once the current query is complete)
 
>>> 
>>> This means the operator will repeat the query to fetch newly added rows,
>>> similar to what the JDBC poll operator does, correct?
>> [Ananth]>> Yes.  All of this design is covered by the Abstract
>> implementation. In fact there is a default implementation of the abstract
>> operator that does exactly this.This default implementation operator is
>> called IncrementalStepScanInputOperator. This operator based on a
>> properties file can be used to implement the JDBC Poll operator
>> functionality using a timestamp column as the incremental step value.
>> 
>> The design however does not limit us to only this pattern but can
>> accomodate other patterns as well. Here is what I want to add in this
>> context:
>>- Additional pattern can include a “time travel pattern”. Since Kudu
>> is an MVCC engine ( and if appropriately configured ) , we can use this
>> operator to answer question like “ Can I stream the entire or subset of the
>> kudu table at times 1 AM , 2 AM , 3 AM ..“ Of today even though the current
>> time could be 6 P.M. ( This is enabled by specifying the READ_SNAPSHOT_TIME
>> which is a supported option of the SQL grammar we are enabling for this
>> operator )
>> 
> 
> So this could be used to do event time based processing based on the
> snapshot time (without a timestamp column)?
> 

 [Ananth]>>> Yes. That is correct. 

> 
>>- Another interesting pattern is when the next query has got no
>> correlation with a previous query . Example use cases can be say using
>> Apex-cli equivalent or more possible future use case like Apache Zeppelin
>> integration. A query comes in ad-hoc and the values can be streamed from
>> the current incoming expression i.e. when we want to enable interactive
>> query based streaming.
>> 
>>> 
>>> - The operator will work in a distributed fashion for the input query.
>> This
 essentially means for a single input query, the scan work is distributed
 among all of the physical instances of the input operator.
 - Kudu splits a table into chunks of data regions called Tablets. The
 tablets are replicated and partitioned  (range and hash partitions are
 supported ) in Kudu according to the Kudu Table definition. The operator
 allows partitioning of the Input Operator to be done in 2 ways.
   - Map many Kudu Tablets to one partition of the Apex Kudu Input
 operator
   - One Kudu Tablet maps to one partition of the Apex Kudu Input
 operator
 - The partitioning does not change on a per query basis. This is because
 of the complex use cases that would arise. For example, if the query 

Re: [Design discussion] - Kudu Input operator

2017-08-21 Thread Thomas Weise
-->

Thanks,
Thomas


On Sat, Aug 19, 2017 at 2:07 PM, Ananth G  wrote:

> Hello Thomas,
>
> Replies in line marked [Ananth]>>
>
> Apologies for a little bit more longer description as I think the
> description needs more clarity.
>
> Regards,
> Ananth
>
> > On 19 Aug 2017, at 11:10 am, Thomas Weise  wrote:
> >
> > Hi Ananth,
> >
> > Nice writeup, couple questions/comments inline ->
> >
> > On Tue, Aug 15, 2017 at 2:02 PM, Ananth G  > wrote:
> >
> >> Hello All,
> >>
> >> The implementation for Apex Kudu Input Operator is ready for a pull
> >> request. Before raising the pull request, I would like to get any inputs
> >> regarding the design and incorporate any feedback before raising the
> pull
> >> request in the next couple of days for the following JIRA.
> >>
> >> https://issues.apache.org/jira/browse/APEXMALHAR-2472 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2472> <
> >> https://issues.apache.org/jira/browse/APEXMALHAR-2472 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2472>>
> >>
> >> The following are the main features that would be supported by the Input
> >> operator:
> >>
> >> - The input operator would be used to scan all or some rows of a single
> >> kudu table.
> >> - Each Kudu row is translated to a POJO for downstream operators.
> >> - The Input operator would accept an SQL expression ( described in
> detail
> >> below) that would be parsed to generate the equivalent scanner code for
> the
> >> Kudu Table. This is because Kudu Table API does not support an SQL
> >> expressions
> >> - The SQL expression would have additional options that would help in
> >> Apache Apex design patterns ( Ex: Sending a control tuple message after
> a
> >> query is successfully processed )
> >> - The Input operator works on a continuous basis i.e. it would accept
> the
> >> next query once the current query is complete)
> >>
> >
> > This means the operator will repeat the query to fetch newly added rows,
> > similar to what the JDBC poll operator does, correct?
> [Ananth]>> Yes.  All of this design is covered by the Abstract
> implementation. In fact there is a default implementation of the abstract
> operator that does exactly this.This default implementation operator is
> called IncrementalStepScanInputOperator. This operator based on a
> properties file can be used to implement the JDBC Poll operator
> functionality using a timestamp column as the incremental step value.
>
> The design however does not limit us to only this pattern but can
> accomodate other patterns as well. Here is what I want to add in this
> context:
> - Additional pattern can include a “time travel pattern”. Since Kudu
> is an MVCC engine ( and if appropriately configured ) , we can use this
> operator to answer question like “ Can I stream the entire or subset of the
> kudu table at times 1 AM , 2 AM , 3 AM ..“ Of today even though the current
> time could be 6 P.M. ( This is enabled by specifying the READ_SNAPSHOT_TIME
> which is a supported option of the SQL grammar we are enabling for this
> operator )
>

So this could be used to do event time based processing based on the
snapshot time (without a timestamp column)?


> - Another interesting pattern is when the next query has got no
> correlation with a previous query . Example use cases can be say using
> Apex-cli equivalent or more possible future use case like Apache Zeppelin
> integration. A query comes in ad-hoc and the values can be streamed from
> the current incoming expression i.e. when we want to enable interactive
> query based streaming.
>
> >
> > - The operator will work in a distributed fashion for the input query.
> This
> >> essentially means for a single input query, the scan work is distributed
> >> among all of the physical instances of the input operator.
> >> - Kudu splits a table into chunks of data regions called Tablets. The
> >> tablets are replicated and partitioned  (range and hash partitions are
> >> supported ) in Kudu according to the Kudu Table definition. The operator
> >> allows partitioning of the Input Operator to be done in 2 ways.
> >>- Map many Kudu Tablets to one partition of the Apex Kudu Input
> >> operator
> >>- One Kudu Tablet maps to one partition of the Apex Kudu Input
> >> operator
> >> - The partitioning does not change on a per query basis. This is because
> >> of the complex use cases that would arise. For example, if the query is
> >> touching only a few rows before the next query is accepted, it would
> result
> >> in a lot of churn in terms of operator serialize/deserialze, YARN
> >> allocation requests etc. Also supporting per query partition planning
> leads
> >> to possibly very complex implementation and poor resource usage as all
> >> physical instances of the operator have to wait for its peers to
> complete
> >> its scan and wait for next checkpoint to get repartitioned.
> >>
> >
> > Agreed, 

Re: [Design discussion] - Kudu Input operator

2017-08-19 Thread Ananth G
Hello Thomas,

Replies in line marked [Ananth]>> 

Apologies for a little bit more longer description as I think the description 
needs more clarity. 

Regards,
Ananth

> On 19 Aug 2017, at 11:10 am, Thomas Weise  wrote:
> 
> Hi Ananth,
> 
> Nice writeup, couple questions/comments inline ->
> 
> On Tue, Aug 15, 2017 at 2:02 PM, Ananth G  > wrote:
> 
>> Hello All,
>> 
>> The implementation for Apex Kudu Input Operator is ready for a pull
>> request. Before raising the pull request, I would like to get any inputs
>> regarding the design and incorporate any feedback before raising the pull
>> request in the next couple of days for the following JIRA.
>> 
>> https://issues.apache.org/jira/browse/APEXMALHAR-2472 
>>  <
>> https://issues.apache.org/jira/browse/APEXMALHAR-2472 
>> >
>> 
>> The following are the main features that would be supported by the Input
>> operator:
>> 
>> - The input operator would be used to scan all or some rows of a single
>> kudu table.
>> - Each Kudu row is translated to a POJO for downstream operators.
>> - The Input operator would accept an SQL expression ( described in detail
>> below) that would be parsed to generate the equivalent scanner code for the
>> Kudu Table. This is because Kudu Table API does not support an SQL
>> expressions
>> - The SQL expression would have additional options that would help in
>> Apache Apex design patterns ( Ex: Sending a control tuple message after a
>> query is successfully processed )
>> - The Input operator works on a continuous basis i.e. it would accept the
>> next query once the current query is complete)
>> 
> 
> This means the operator will repeat the query to fetch newly added rows,
> similar to what the JDBC poll operator does, correct?
[Ananth]>> Yes.  All of this design is covered by the Abstract implementation. 
In fact there is a default implementation of the abstract operator that does 
exactly this.This default implementation operator is called 
IncrementalStepScanInputOperator. This operator based on a properties file can 
be used to implement the JDBC Poll operator functionality using a timestamp 
column as the incremental step value. 

The design however does not limit us to only this pattern but can accomodate 
other patterns as well. Here is what I want to add in this context: 
- Additional pattern can include a “time travel pattern”. Since Kudu is an 
MVCC engine ( and if appropriately configured ) , we can use this operator to 
answer question like “ Can I stream the entire or subset of the kudu table at 
times 1 AM , 2 AM , 3 AM ..“ Of today even though the current time could be 6 
P.M. ( This is enabled by specifying the READ_SNAPSHOT_TIME which is a 
supported option of the SQL grammar we are enabling for this operator )
- Another interesting pattern is when the next query has got no correlation 
with a previous query . Example use cases can be say using Apex-cli equivalent 
or more possible future use case like Apache Zeppelin integration. A query 
comes in ad-hoc and the values can be streamed from the current incoming 
expression i.e. when we want to enable interactive query based streaming.

> 
> - The operator will work in a distributed fashion for the input query. This
>> essentially means for a single input query, the scan work is distributed
>> among all of the physical instances of the input operator.
>> - Kudu splits a table into chunks of data regions called Tablets. The
>> tablets are replicated and partitioned  (range and hash partitions are
>> supported ) in Kudu according to the Kudu Table definition. The operator
>> allows partitioning of the Input Operator to be done in 2 ways.
>>- Map many Kudu Tablets to one partition of the Apex Kudu Input
>> operator
>>- One Kudu Tablet maps to one partition of the Apex Kudu Input
>> operator
>> - The partitioning does not change on a per query basis. This is because
>> of the complex use cases that would arise. For example, if the query is
>> touching only a few rows before the next query is accepted, it would result
>> in a lot of churn in terms of operator serialize/deserialze, YARN
>> allocation requests etc. Also supporting per query partition planning leads
>> to possibly very complex implementation and poor resource usage as all
>> physical instances of the operator have to wait for its peers to complete
>> its scan and wait for next checkpoint to get repartitioned.
>> 
> 
> Agreed, what would be a reason to change partitioning between queries
> though?
> 
[Ananth]>> Was making that note more in the context of Dynamic partitioning. My 
current understanding is that dynamic partitioning is entirely based on the 
performance stats and I was thinking that did not make exact sense when the 
stats need to be entirely dependent on the business logic ( query in 

Re: [Design discussion] - Kudu Input operator

2017-08-18 Thread Thomas Weise
Hi Ananth,

Nice writeup, couple questions/comments inline ->

On Tue, Aug 15, 2017 at 2:02 PM, Ananth G  wrote:

> Hello All,
>
> The implementation for Apex Kudu Input Operator is ready for a pull
> request. Before raising the pull request, I would like to get any inputs
> regarding the design and incorporate any feedback before raising the pull
> request in the next couple of days for the following JIRA.
>
> https://issues.apache.org/jira/browse/APEXMALHAR-2472 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2472>
>
> The following are the main features that would be supported by the Input
> operator:
>
> - The input operator would be used to scan all or some rows of a single
> kudu table.
> - Each Kudu row is translated to a POJO for downstream operators.
> - The Input operator would accept an SQL expression ( described in detail
> below) that would be parsed to generate the equivalent scanner code for the
> Kudu Table. This is because Kudu Table API does not support an SQL
> expressions
> - The SQL expression would have additional options that would help in
> Apache Apex design patterns ( Ex: Sending a control tuple message after a
> query is successfully processed )
> - The Input operator works on a continuous basis i.e. it would accept the
> next query once the current query is complete)
>

This means the operator will repeat the query to fetch newly added rows,
similar to what the JDBC poll operator does, correct?

- The operator will work in a distributed fashion for the input query. This
> essentially means for a single input query, the scan work is distributed
> among all of the physical instances of the input operator.
> - Kudu splits a table into chunks of data regions called Tablets. The
> tablets are replicated and partitioned  (range and hash partitions are
> supported ) in Kudu according to the Kudu Table definition. The operator
> allows partitioning of the Input Operator to be done in 2 ways.
> - Map many Kudu Tablets to one partition of the Apex Kudu Input
> operator
> - One Kudu Tablet maps to one partition of the Apex Kudu Input
> operator
> - The partitioning does not change on a per query basis. This is because
> of the complex use cases that would arise. For example, if the query is
> touching only a few rows before the next query is accepted, it would result
> in a lot of churn in terms of operator serialize/deserialze, YARN
> allocation requests etc. Also supporting per query partition planning leads
> to possibly very complex implementation and poor resource usage as all
> physical instances of the operator have to wait for its peers to complete
> its scan and wait for next checkpoint to get repartitioned.
>

Agreed, what would be a reason to change partitioning between queries
though?


> - The partitioner splits the work load of a single query in a round robin
> fashion. After a query plan is generated , each scan token range is
> distributed equally among the physical operator instances.
> - The operator allows for two modes of scanning for an application (
> Cannot be changed on a per query basis )
> - Consistent Order scanner - only one tablet scan thread is active
> at any given instance of time for a given query
> - Random Order scanner - Many threads are active to scan Kudu
> tablets in parallel
> - As can be seen, Consistent order scanner would be slower but would help
> in better “exactly once” implementations if the correct method is
> overridden in the operator.
>

Can you elaborate on this a bit more? Ordering within a streaming window
generally isn't deterministic when you have a shuffle or stream merge. And
the association between input records and streaming windows can be made
deterministic by using the window data manager?


> - The operator introduces the DisruptorBlockingQueue for a low latency
> buffer management. LMAX disruptor library was considered and based on some
> other discussion threads on other Apache projects, settled on the
> ConversantMedia implementation of the Disruptor Blocking queue. This
> blocking queue is used when the kudu scanner thread wants to send the
> scanned row into the input operators main thread emitTuples() call.
> - The operator allows for exactly once semantics if the user specifies the
> logic for reconciling a possible duplicate row in situations when the
> operator is resuming from a checkpoint. This is done by overriding a method
> that returns a boolean ( true to emit the tuple and false to suppress the
> tuple ) when the operating is working in the reconciling window phase. As
> can be seen, this reconciling phase is only active at the max for one
> window.

- The operator uses the FSWindowManager to manage metadata at the end of
> every window. From resumption at a checkpoint, the operator will still scan
> the Kudu tablets but simply not emit all rows that were already streamed
> downstream. Subsequently when the operator is in the reconciling window,
> the 

[Design discussion] - Kudu Input operator

2017-08-15 Thread Ananth G
Hello All,

The implementation for Apex Kudu Input Operator is ready for a pull request. 
Before raising the pull request, I would like to get any inputs regarding the 
design and incorporate any feedback before raising the pull request in the next 
couple of days for the following JIRA.

https://issues.apache.org/jira/browse/APEXMALHAR-2472 


The following are the main features that would be supported by the Input 
operator:

- The input operator would be used to scan all or some rows of a single kudu 
table.
- Each Kudu row is translated to a POJO for downstream operators. 
- The Input operator would accept an SQL expression ( described in detail 
below) that would be parsed to generate the equivalent scanner code for the 
Kudu Table. This is because Kudu Table API does not support an SQL expressions 
- The SQL expression would have additional options that would help in Apache 
Apex design patterns ( Ex: Sending a control tuple message after a query is 
successfully processed )
- The Input operator works on a continuous basis i.e. it would accept the next 
query once the current query is complete)
- The operator will work in a distributed fashion for the input query. This 
essentially means for a single input query, the scan work is distributed among 
all of the physical instances of the input operator.
- Kudu splits a table into chunks of data regions called Tablets. The tablets 
are replicated and partitioned  (range and hash partitions are supported ) in 
Kudu according to the Kudu Table definition. The operator allows partitioning 
of the Input Operator to be done in 2 ways. 
- Map many Kudu Tablets to one partition of the Apex Kudu Input operator
- One Kudu Tablet maps to one partition of the Apex Kudu Input operator
- The partitioning does not change on a per query basis. This is because of the 
complex use cases that would arise. For example, if the query is touching only 
a few rows before the next query is accepted, it would result in a lot of churn 
in terms of operator serialize/deserialze, YARN allocation requests etc. Also 
supporting per query partition planning leads to possibly very complex 
implementation and poor resource usage as all physical instances of the 
operator have to wait for its peers to complete its scan and wait for next 
checkpoint to get repartitioned.
- The partitioner splits the work load of a single query in a round robin 
fashion. After a query plan is generated , each scan token range is distributed 
equally among the physical operator instances.
- The operator allows for two modes of scanning for an application ( Cannot be 
changed on a per query basis ) 
- Consistent Order scanner - only one tablet scan thread is active at 
any given instance of time for a given query
- Random Order scanner - Many threads are active to scan Kudu tablets 
in parallel
- As can be seen, Consistent order scanner would be slower but would help in 
better “exactly once” implementations if the correct method is overridden in 
the operator.
- The operator introduces the DisruptorBlockingQueue for a low latency buffer 
management. LMAX disruptor library was considered and based on some other 
discussion threads on other Apache projects, settled on the ConversantMedia 
implementation of the Disruptor Blocking queue. This blocking queue is used 
when the kudu scanner thread wants to send the scanned row into the input 
operators main thread emitTuples() call.
- The operator allows for exactly once semantics if the user specifies the 
logic for reconciling a possible duplicate row in situations when the operator 
is resuming from a checkpoint. This is done by overriding a method that returns 
a boolean ( true to emit the tuple and false to suppress the tuple ) when the 
operating is working in the reconciling window phase. As can be seen, this 
reconciling phase is only active at the max for one window.
- The operator uses the FSWindowManager to manage metadata at the end of every 
window. From resumption at a checkpoint, the operator will still scan the Kudu 
tablets but simply not emit all rows that were already streamed downstream. 
Subsequently when the operator is in the reconciling window, the method 
described above is invoked to allow for duplicates filter. After this 
reconciling window, the operator works in the normal mode of operation.
- The following are the additional configurable aspects of the operator
- Max tuples per window
- Spin policy and the buffer size for the Disruptor Blocking Queue
- Mechanism to provide custom control tuples if required
- Setting the number of Physical operator instances via the API if 
required. 
- Setting the fault Tolerance. If fault tolerant , an alternative 
replica of the Kudu tablet is picked up for scanning if the initial tablet 
fails for whatever reason. However this slows down the scan throughput. Hence 
it is