Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-30 Thread Guozhang Wang
Hmm, I'm not sure if we can optimize this case as well. Following your
example:

KTable T1 = builder.table("source-topic");
KTable T2 = table.filter(value > 2);


And suppose the "source-topic" is piping the messages to T1 as: {a: 3}, {b:
5}, {a: 1}...

When {a: 3} is passed from T1 to T2, the filter will pass and hence it is
forwarded to downstream operators already; so now when later {a: 1} is
passed from T1 to T2, meaning "modifying the value with key {a} from 3 to
1", the filter will not pass any more, and hence in this case we need to
forward a {a: null} record downstreams in order to indicate the previously
forwarded {a: 3} has now been deleted in T2, right?

Anyways, we could move our discussions to the PR to not swamp the dev
mailing list.


Guozhang


On Thu, Jun 30, 2016 at 5:43 AM, Philippe Derome  wrote:

> Guozhang,
>
> my latest commit would propose that semantics of your JIRA case 2 be
> changed a little to suppress nulls when not sendingOldValues and not
> materializing. When a table T2 is created first from another table T1 and
> the filter does not match for the key k from T1, the invalid key k does not
> enter T2 at all (no null). Ultimately, the code change is simpler and the
> test results look more intuitive.
>
> On Wed, Jun 29, 2016 at 6:55 PM, Philippe Derome 
> wrote:
>
> > good.
> >
> > On Wed, Jun 29, 2016 at 6:44 PM, Guozhang Wang 
> wrote:
> >
> >> Yes, they are related in the sense that if we always materialize a
> source
> >> KTable, then we can completely replace the `sendOldValues` as it will
> >> always be true. But since 3911 is a rather big change, I'd prefer to
> >> complete this ticket first, and refactor it when we decided to work on
> >> 3911
> >> later.
> >>
> >> Feel free to link these two tickets though.
> >>
> >> Guozhang
> >>
> >> On Tue, Jun 28, 2016 at 9:47 AM, Philippe Derome 
> >> wrote:
> >>
> >> > Is this point of view consistent with new ticket 3911 (Enforce KTable
> >> > materialisation ) just submitted by Eno. T?
> >> >
> >> > Should the two tickets be linked somehow if they are related?
> >> > My concern is that, the overhead of requesting the source KTable to be
> >> > materialized (i.e. creating a state store, and sending the {old ->
> new}
> >> > pair instead of the new value only) may be over-whelming compared with
> >> its
> >> > potential benefits of reducing the downstream traffic.
> >> >
> >> > Guozhang
> >> >
> >> > On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome 
> >> > wrote:
> >> >
> >> > > Guozhang,
> >> > >
> >> > > would you say it's advisable to initialize
> KTableFilter.sendOldValues
> >> to
> >> > > true instead of false? That's what I see that can trigger your
> >> described
> >> > > case 3 to potentially desirable effect, but I didn't include it into
> >> pull
> >> > > request. If left to default value of false, I don't know what
> >> mechanism
> >> > > should override it to true.
> >> > >
> >> > > Phil
> >> > >
> >> > > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang  >
> >> > > wrote:
> >> > >
> >> > > > Thanks! You can follow this step-by-step guidance to contribute to
> >> > Kafka
> >> > > > via github.
> >> > > >
> >> > > >
> >> > > >
> >> > >
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
> >> > > >
> >> > > >
> >> > > > Guozhang
> >> > > >
> >> > > >
> >> > > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome <
> >> phder...@gmail.com>
> >> > > > wrote:
> >> > > >
> >> > > > > I have a 1 liner solution for this in KTableFilter.java and
> about
> >> 5-6
> >> > > > lines
> >> > > > > changes to existing unit test
> >> KTableFilterTest.testSendingOldValue. I
> >> > > > > included those lines with context in the JIRA. I am struggling a
> >> bit
> >> > > with
> >> > > > > github being new to it and how to do a proper pull request so
> >> > hopefully
> >> > > > > that can be followed up by you? I had the streams test suite
> pass
> >> > aside
> >> > > > for
> >> > > > > a few cases that pertain specifically to this JIRA as
> assumptions
> >> > have
> >> > > > now
> >> > > > > changed.
> >> > > > >
> >> > > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang <
> >> wangg...@gmail.com>
> >> > > > wrote:
> >> > > > >
> >> > > > > > Hi Philippe,
> >> > > > > >
> >> > > > > > Great, since you agree with my reasonings, I have created a
> JIRA
> >> > > ticket
> >> > > > > for
> >> > > > > > optimizing KTableFilter (feel free to pick it up if you are
> >> > > interested
> >> > > > in
> >> > > > > > contributing):
> >> > > > > >
> >> > > > > > https://issues.apache.org/jira/browse/KAFKA-3902
> >> > > > > >
> >> > > > > > About case 3-c-1), what I meant is that since "predicate
> return
> >> > true
> >> > > on
> >> > > > > > both",
> >> > > > > > the resulted pair would just be the same as the original pair.
> >> > > > > >
> >> > > > > > 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-30 Thread Philippe Derome
Guozhang,

my latest commit would propose that semantics of your JIRA case 2 be
changed a little to suppress nulls when not sendingOldValues and not
materializing. When a table T2 is created first from another table T1 and
the filter does not match for the key k from T1, the invalid key k does not
enter T2 at all (no null). Ultimately, the code change is simpler and the
test results look more intuitive.

On Wed, Jun 29, 2016 at 6:55 PM, Philippe Derome  wrote:

> good.
>
> On Wed, Jun 29, 2016 at 6:44 PM, Guozhang Wang  wrote:
>
>> Yes, they are related in the sense that if we always materialize a source
>> KTable, then we can completely replace the `sendOldValues` as it will
>> always be true. But since 3911 is a rather big change, I'd prefer to
>> complete this ticket first, and refactor it when we decided to work on
>> 3911
>> later.
>>
>> Feel free to link these two tickets though.
>>
>> Guozhang
>>
>> On Tue, Jun 28, 2016 at 9:47 AM, Philippe Derome 
>> wrote:
>>
>> > Is this point of view consistent with new ticket 3911 (Enforce KTable
>> > materialisation ) just submitted by Eno. T?
>> >
>> > Should the two tickets be linked somehow if they are related?
>> > My concern is that, the overhead of requesting the source KTable to be
>> > materialized (i.e. creating a state store, and sending the {old -> new}
>> > pair instead of the new value only) may be over-whelming compared with
>> its
>> > potential benefits of reducing the downstream traffic.
>> >
>> > Guozhang
>> >
>> > On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome 
>> > wrote:
>> >
>> > > Guozhang,
>> > >
>> > > would you say it's advisable to initialize KTableFilter.sendOldValues
>> to
>> > > true instead of false? That's what I see that can trigger your
>> described
>> > > case 3 to potentially desirable effect, but I didn't include it into
>> pull
>> > > request. If left to default value of false, I don't know what
>> mechanism
>> > > should override it to true.
>> > >
>> > > Phil
>> > >
>> > > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang 
>> > > wrote:
>> > >
>> > > > Thanks! You can follow this step-by-step guidance to contribute to
>> > Kafka
>> > > > via github.
>> > > >
>> > > >
>> > > >
>> > >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
>> > > >
>> > > >
>> > > > Guozhang
>> > > >
>> > > >
>> > > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome <
>> phder...@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > I have a 1 liner solution for this in KTableFilter.java and about
>> 5-6
>> > > > lines
>> > > > > changes to existing unit test
>> KTableFilterTest.testSendingOldValue. I
>> > > > > included those lines with context in the JIRA. I am struggling a
>> bit
>> > > with
>> > > > > github being new to it and how to do a proper pull request so
>> > hopefully
>> > > > > that can be followed up by you? I had the streams test suite pass
>> > aside
>> > > > for
>> > > > > a few cases that pertain specifically to this JIRA as assumptions
>> > have
>> > > > now
>> > > > > changed.
>> > > > >
>> > > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang <
>> wangg...@gmail.com>
>> > > > wrote:
>> > > > >
>> > > > > > Hi Philippe,
>> > > > > >
>> > > > > > Great, since you agree with my reasonings, I have created a JIRA
>> > > ticket
>> > > > > for
>> > > > > > optimizing KTableFilter (feel free to pick it up if you are
>> > > interested
>> > > > in
>> > > > > > contributing):
>> > > > > >
>> > > > > > https://issues.apache.org/jira/browse/KAFKA-3902
>> > > > > >
>> > > > > > About case 3-c-1), what I meant is that since "predicate return
>> > true
>> > > on
>> > > > > > both",
>> > > > > > the resulted pair would just be the same as the original pair.
>> > > > > >
>> > > > > > About KIP-63, itself is a rather big story, but it has one
>> > > > correspondence
>> > > > > > to this JIRA: with caching you can dedup some records with the
>> same
>> > > > key,
>> > > > > > for example in the input records to the KTable is:
>> > > > > >
>> > > > > > , , , , ,  ...
>> > > > > >
>> > > > > > And the KTable is materialized into a state store with cache on
>> top
>> > > of
>> > > > > it,
>> > > > > > then the resulted downstream could be:
>> > > > > >
>> > > > > >  1}>,  6}> ...
>> > > > > >
>> > > > > > Instead of
>> > > > > >
>> > > > > >  1}>,  2}>,  3}>, ... 
>> 6}>
>> > ...
>> > > > > >
>> > > > > > So if it is piped to a filter() operator, then even less data
>> will
>> > be
>> > > > > > produced.
>> > > > > >
>> > > > > >
>> > > > > > Guozhang
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome <
>> > phder...@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Yes, it looks very good. Your detailed explanation appears
>> > > compelling
>> > > > > > > enough to reveal that some of the details of the complexity
>> of a
>> 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-29 Thread Philippe Derome
good.

On Wed, Jun 29, 2016 at 6:44 PM, Guozhang Wang  wrote:

> Yes, they are related in the sense that if we always materialize a source
> KTable, then we can completely replace the `sendOldValues` as it will
> always be true. But since 3911 is a rather big change, I'd prefer to
> complete this ticket first, and refactor it when we decided to work on 3911
> later.
>
> Feel free to link these two tickets though.
>
> Guozhang
>
> On Tue, Jun 28, 2016 at 9:47 AM, Philippe Derome 
> wrote:
>
> > Is this point of view consistent with new ticket 3911 (Enforce KTable
> > materialisation ) just submitted by Eno. T?
> >
> > Should the two tickets be linked somehow if they are related?
> > My concern is that, the overhead of requesting the source KTable to be
> > materialized (i.e. creating a state store, and sending the {old -> new}
> > pair instead of the new value only) may be over-whelming compared with
> its
> > potential benefits of reducing the downstream traffic.
> >
> > Guozhang
> >
> > On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome 
> > wrote:
> >
> > > Guozhang,
> > >
> > > would you say it's advisable to initialize KTableFilter.sendOldValues
> to
> > > true instead of false? That's what I see that can trigger your
> described
> > > case 3 to potentially desirable effect, but I didn't include it into
> pull
> > > request. If left to default value of false, I don't know what mechanism
> > > should override it to true.
> > >
> > > Phil
> > >
> > > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang 
> > > wrote:
> > >
> > > > Thanks! You can follow this step-by-step guidance to contribute to
> > Kafka
> > > > via github.
> > > >
> > > >
> > > >
> > >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome  >
> > > > wrote:
> > > >
> > > > > I have a 1 liner solution for this in KTableFilter.java and about
> 5-6
> > > > lines
> > > > > changes to existing unit test
> KTableFilterTest.testSendingOldValue. I
> > > > > included those lines with context in the JIRA. I am struggling a
> bit
> > > with
> > > > > github being new to it and how to do a proper pull request so
> > hopefully
> > > > > that can be followed up by you? I had the streams test suite pass
> > aside
> > > > for
> > > > > a few cases that pertain specifically to this JIRA as assumptions
> > have
> > > > now
> > > > > changed.
> > > > >
> > > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Hi Philippe,
> > > > > >
> > > > > > Great, since you agree with my reasonings, I have created a JIRA
> > > ticket
> > > > > for
> > > > > > optimizing KTableFilter (feel free to pick it up if you are
> > > interested
> > > > in
> > > > > > contributing):
> > > > > >
> > > > > > https://issues.apache.org/jira/browse/KAFKA-3902
> > > > > >
> > > > > > About case 3-c-1), what I meant is that since "predicate return
> > true
> > > on
> > > > > > both",
> > > > > > the resulted pair would just be the same as the original pair.
> > > > > >
> > > > > > About KIP-63, itself is a rather big story, but it has one
> > > > correspondence
> > > > > > to this JIRA: with caching you can dedup some records with the
> same
> > > > key,
> > > > > > for example in the input records to the KTable is:
> > > > > >
> > > > > > , , , , ,  ...
> > > > > >
> > > > > > And the KTable is materialized into a state store with cache on
> top
> > > of
> > > > > it,
> > > > > > then the resulted downstream could be:
> > > > > >
> > > > > >  1}>,  6}> ...
> > > > > >
> > > > > > Instead of
> > > > > >
> > > > > >  1}>,  2}>,  3}>, ...  6}>
> > ...
> > > > > >
> > > > > > So if it is piped to a filter() operator, then even less data
> will
> > be
> > > > > > produced.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome <
> > phder...@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Yes, it looks very good. Your detailed explanation appears
> > > compelling
> > > > > > > enough to reveal that some of the details of the complexity of
> a
> > > > > streams
> > > > > > > system are probably inherent complexity (not that I dared
> assume
> > it
> > > > was
> > > > > > > "easy" but I could afford to be conveniently unaware). It took
> me
> > > 30
> > > > > > > minutes to grasp this latest response.
> > > > > > >
> > > > > > > There might be a typo in your email for case 3.c.1) as I would
> > > think
> > > > we
> > > > > > > should send the most recent pair as opposed to original, in any
> > > event
> > > > > it
> > > > > > > does not materially impact your presentation.
> > > > > > >
> > > > > > > Your case 3a) is really what triggered my line of questioning
> and
> > I
> > > 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-29 Thread Guozhang Wang
Yes, they are related in the sense that if we always materialize a source
KTable, then we can completely replace the `sendOldValues` as it will
always be true. But since 3911 is a rather big change, I'd prefer to
complete this ticket first, and refactor it when we decided to work on 3911
later.

Feel free to link these two tickets though.

Guozhang

On Tue, Jun 28, 2016 at 9:47 AM, Philippe Derome  wrote:

> Is this point of view consistent with new ticket 3911 (Enforce KTable
> materialisation ) just submitted by Eno. T?
>
> Should the two tickets be linked somehow if they are related?
> My concern is that, the overhead of requesting the source KTable to be
> materialized (i.e. creating a state store, and sending the {old -> new}
> pair instead of the new value only) may be over-whelming compared with its
> potential benefits of reducing the downstream traffic.
>
> Guozhang
>
> On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome 
> wrote:
>
> > Guozhang,
> >
> > would you say it's advisable to initialize KTableFilter.sendOldValues to
> > true instead of false? That's what I see that can trigger your described
> > case 3 to potentially desirable effect, but I didn't include it into pull
> > request. If left to default value of false, I don't know what mechanism
> > should override it to true.
> >
> > Phil
> >
> > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang 
> > wrote:
> >
> > > Thanks! You can follow this step-by-step guidance to contribute to
> Kafka
> > > via github.
> > >
> > >
> > >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome 
> > > wrote:
> > >
> > > > I have a 1 liner solution for this in KTableFilter.java and about 5-6
> > > lines
> > > > changes to existing unit test KTableFilterTest.testSendingOldValue. I
> > > > included those lines with context in the JIRA. I am struggling a bit
> > with
> > > > github being new to it and how to do a proper pull request so
> hopefully
> > > > that can be followed up by you? I had the streams test suite pass
> aside
> > > for
> > > > a few cases that pertain specifically to this JIRA as assumptions
> have
> > > now
> > > > changed.
> > > >
> > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hi Philippe,
> > > > >
> > > > > Great, since you agree with my reasonings, I have created a JIRA
> > ticket
> > > > for
> > > > > optimizing KTableFilter (feel free to pick it up if you are
> > interested
> > > in
> > > > > contributing):
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-3902
> > > > >
> > > > > About case 3-c-1), what I meant is that since "predicate return
> true
> > on
> > > > > both",
> > > > > the resulted pair would just be the same as the original pair.
> > > > >
> > > > > About KIP-63, itself is a rather big story, but it has one
> > > correspondence
> > > > > to this JIRA: with caching you can dedup some records with the same
> > > key,
> > > > > for example in the input records to the KTable is:
> > > > >
> > > > > , , , , ,  ...
> > > > >
> > > > > And the KTable is materialized into a state store with cache on top
> > of
> > > > it,
> > > > > then the resulted downstream could be:
> > > > >
> > > > >  1}>,  6}> ...
> > > > >
> > > > > Instead of
> > > > >
> > > > >  1}>,  2}>,  3}>, ...  6}>
> ...
> > > > >
> > > > > So if it is piped to a filter() operator, then even less data will
> be
> > > > > produced.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome <
> phder...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Yes, it looks very good. Your detailed explanation appears
> > compelling
> > > > > > enough to reveal that some of the details of the complexity of a
> > > > streams
> > > > > > system are probably inherent complexity (not that I dared assume
> it
> > > was
> > > > > > "easy" but I could afford to be conveniently unaware). It took me
> > 30
> > > > > > minutes to grasp this latest response.
> > > > > >
> > > > > > There might be a typo in your email for case 3.c.1) as I would
> > think
> > > we
> > > > > > should send the most recent pair as opposed to original, in any
> > event
> > > > it
> > > > > > does not materially impact your presentation.
> > > > > >
> > > > > > Your case 3a) is really what triggered my line of questioning and
> I
> > > > found
> > > > > > the current behaviour vexing as it may lead to some undesirable
> and
> > > > > > necessary filter (see Michael G. Noll's fix in
> > > UserRegionLambdaExample
> > > > at
> > > > > > the very end trying to weed out null) used to output to topic to
> > > > console.
> > > > > > Without looking at design, it seemed self-evident to me that the
> > 3a)
> > > > > > behaviour had to be 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-28 Thread Philippe Derome
Is this point of view consistent with new ticket 3911 (Enforce KTable
materialisation ) just submitted by Eno. T?

Should the two tickets be linked somehow if they are related?
My concern is that, the overhead of requesting the source KTable to be
materialized (i.e. creating a state store, and sending the {old -> new}
pair instead of the new value only) may be over-whelming compared with its
potential benefits of reducing the downstream traffic.

Guozhang

On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome  wrote:

> Guozhang,
>
> would you say it's advisable to initialize KTableFilter.sendOldValues to
> true instead of false? That's what I see that can trigger your described
> case 3 to potentially desirable effect, but I didn't include it into pull
> request. If left to default value of false, I don't know what mechanism
> should override it to true.
>
> Phil
>
> On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang 
> wrote:
>
> > Thanks! You can follow this step-by-step guidance to contribute to Kafka
> > via github.
> >
> >
> >
>
https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
> >
> >
> > Guozhang
> >
> >
> > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome 
> > wrote:
> >
> > > I have a 1 liner solution for this in KTableFilter.java and about 5-6
> > lines
> > > changes to existing unit test KTableFilterTest.testSendingOldValue. I
> > > included those lines with context in the JIRA. I am struggling a bit
> with
> > > github being new to it and how to do a proper pull request so
hopefully
> > > that can be followed up by you? I had the streams test suite pass
aside
> > for
> > > a few cases that pertain specifically to this JIRA as assumptions have
> > now
> > > changed.
> > >
> > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang 
> > wrote:
> > >
> > > > Hi Philippe,
> > > >
> > > > Great, since you agree with my reasonings, I have created a JIRA
> ticket
> > > for
> > > > optimizing KTableFilter (feel free to pick it up if you are
> interested
> > in
> > > > contributing):
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-3902
> > > >
> > > > About case 3-c-1), what I meant is that since "predicate return true
> on
> > > > both",
> > > > the resulted pair would just be the same as the original pair.
> > > >
> > > > About KIP-63, itself is a rather big story, but it has one
> > correspondence
> > > > to this JIRA: with caching you can dedup some records with the same
> > key,
> > > > for example in the input records to the KTable is:
> > > >
> > > > , , , , ,  ...
> > > >
> > > > And the KTable is materialized into a state store with cache on top
> of
> > > it,
> > > > then the resulted downstream could be:
> > > >
> > > >  1}>,  6}> ...
> > > >
> > > > Instead of
> > > >
> > > >  1}>,  2}>,  3}>, ...  6}>
...
> > > >
> > > > So if it is piped to a filter() operator, then even less data will
be
> > > > produced.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome  >
> > > > wrote:
> > > >
> > > > > Yes, it looks very good. Your detailed explanation appears
> compelling
> > > > > enough to reveal that some of the details of the complexity of a
> > > streams
> > > > > system are probably inherent complexity (not that I dared assume
it
> > was
> > > > > "easy" but I could afford to be conveniently unaware). It took me
> 30
> > > > > minutes to grasp this latest response.
> > > > >
> > > > > There might be a typo in your email for case 3.c.1) as I would
> think
> > we
> > > > > should send the most recent pair as opposed to original, in any
> event
> > > it
> > > > > does not materially impact your presentation.
> > > > >
> > > > > Your case 3a) is really what triggered my line of questioning and
I
> > > found
> > > > > the current behaviour vexing as it may lead to some undesirable
and
> > > > > necessary filter (see Michael G. Noll's fix in
> > UserRegionLambdaExample
> > > at
> > > > > the very end trying to weed out null) used to output to topic to
> > > console.
> > > > > Without looking at design, it seemed self-evident to me that the
> 3a)
> > > > > behaviour had to be implemented ( from my point of view with the
> code
> > > > > example I was looking at, it simply means never say to delete a
key
> > > that
> > > > > was never created, simply don't "create a deleted" key).
> > > > >
> > > > > Likewise cases 3 b,c look very reasonable.
> > > > >
> > > > > Just out of curiosity, did you effectively just restate the
essence
> > of
> > > > > KIP-63 in a more approachable language I could understand or is
> > KIP-63
> > > > > really a different beast?
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Hello Philippe,
> > > > > >
> > > > > > Very good points, let me dump my thoughts about 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-27 Thread Guozhang Wang
The boolean flag can be reset by a child operator which requires the source
to be materialized, more details can be found in this design wiki:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65143671

But just to give you a concrete idea, if your topology is defined as:


KTable table1 = builder.table("topic1");
table1.filter().mapValues().to("topic2");

Then then enableSendOldValues will not be set to true and the table1 will
not be materialized; if your topology is defined as:

KTable table1 = builder.table("topic1");
KTable table2 = table1.filter().mapValues().groupBy(..).aggregate(..);

Then the enableSendOldValues will be called from the aggregate() operator,
then back-forth to its parent, and then all the way back to the source
table1 (as in KTableSource).


Guozhang



On Mon, Jun 27, 2016 at 3:00 PM, Philippe Derome  wrote:

> Then I don't see any simple solution here at least for a novice, especially
> since I don't know what can trigger the boolean flag to true.
> On 27 Jun 2016 5:38 p.m., "Guozhang Wang"  wrote:
>
> > My concern is that, the overhead of requesting the source KTable to be
> > materialized (i.e. creating a state store, and sending the {old -> new}
> > pair instead of the new value only) may be over-whelming compared with
> its
> > potential benefits of reducing the downstream traffic.
> >
> > Guozhang
> >
> > On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome 
> > wrote:
> >
> > > Guozhang,
> > >
> > > would you say it's advisable to initialize KTableFilter.sendOldValues
> to
> > > true instead of false? That's what I see that can trigger your
> described
> > > case 3 to potentially desirable effect, but I didn't include it into
> pull
> > > request. If left to default value of false, I don't know what mechanism
> > > should override it to true.
> > >
> > > Phil
> > >
> > > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang 
> > > wrote:
> > >
> > > > Thanks! You can follow this step-by-step guidance to contribute to
> > Kafka
> > > > via github.
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome  >
> > > > wrote:
> > > >
> > > > > I have a 1 liner solution for this in KTableFilter.java and about
> 5-6
> > > > lines
> > > > > changes to existing unit test
> KTableFilterTest.testSendingOldValue. I
> > > > > included those lines with context in the JIRA. I am struggling a
> bit
> > > with
> > > > > github being new to it and how to do a proper pull request so
> > hopefully
> > > > > that can be followed up by you? I had the streams test suite pass
> > aside
> > > > for
> > > > > a few cases that pertain specifically to this JIRA as assumptions
> > have
> > > > now
> > > > > changed.
> > > > >
> > > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Hi Philippe,
> > > > > >
> > > > > > Great, since you agree with my reasonings, I have created a JIRA
> > > ticket
> > > > > for
> > > > > > optimizing KTableFilter (feel free to pick it up if you are
> > > interested
> > > > in
> > > > > > contributing):
> > > > > >
> > > > > > https://issues.apache.org/jira/browse/KAFKA-3902
> > > > > >
> > > > > > About case 3-c-1), what I meant is that since "predicate return
> > true
> > > on
> > > > > > both",
> > > > > > the resulted pair would just be the same as the original pair.
> > > > > >
> > > > > > About KIP-63, itself is a rather big story, but it has one
> > > > correspondence
> > > > > > to this JIRA: with caching you can dedup some records with the
> same
> > > > key,
> > > > > > for example in the input records to the KTable is:
> > > > > >
> > > > > > , , , , ,  ...
> > > > > >
> > > > > > And the KTable is materialized into a state store with cache on
> top
> > > of
> > > > > it,
> > > > > > then the resulted downstream could be:
> > > > > >
> > > > > >  1}>,  6}> ...
> > > > > >
> > > > > > Instead of
> > > > > >
> > > > > >  1}>,  2}>,  3}>, ...  6}>
> > ...
> > > > > >
> > > > > > So if it is piped to a filter() operator, then even less data
> will
> > be
> > > > > > produced.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome <
> > phder...@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Yes, it looks very good. Your detailed explanation appears
> > > compelling
> > > > > > > enough to reveal that some of the details of the complexity of
> a
> > > > > streams
> > > > > > > system are probably inherent complexity (not that I dared
> assume
> > it
> > > > was
> > > > > > > "easy" but I could afford to be conveniently unaware). It took
> me
> > > 30
> > > > > > > minutes to grasp this latest response.
> > > > > > >
> > 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-27 Thread Philippe Derome
Then I don't see any simple solution here at least for a novice, especially
since I don't know what can trigger the boolean flag to true.
On 27 Jun 2016 5:38 p.m., "Guozhang Wang"  wrote:

> My concern is that, the overhead of requesting the source KTable to be
> materialized (i.e. creating a state store, and sending the {old -> new}
> pair instead of the new value only) may be over-whelming compared with its
> potential benefits of reducing the downstream traffic.
>
> Guozhang
>
> On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome 
> wrote:
>
> > Guozhang,
> >
> > would you say it's advisable to initialize KTableFilter.sendOldValues to
> > true instead of false? That's what I see that can trigger your described
> > case 3 to potentially desirable effect, but I didn't include it into pull
> > request. If left to default value of false, I don't know what mechanism
> > should override it to true.
> >
> > Phil
> >
> > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang 
> > wrote:
> >
> > > Thanks! You can follow this step-by-step guidance to contribute to
> Kafka
> > > via github.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome 
> > > wrote:
> > >
> > > > I have a 1 liner solution for this in KTableFilter.java and about 5-6
> > > lines
> > > > changes to existing unit test KTableFilterTest.testSendingOldValue. I
> > > > included those lines with context in the JIRA. I am struggling a bit
> > with
> > > > github being new to it and how to do a proper pull request so
> hopefully
> > > > that can be followed up by you? I had the streams test suite pass
> aside
> > > for
> > > > a few cases that pertain specifically to this JIRA as assumptions
> have
> > > now
> > > > changed.
> > > >
> > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hi Philippe,
> > > > >
> > > > > Great, since you agree with my reasonings, I have created a JIRA
> > ticket
> > > > for
> > > > > optimizing KTableFilter (feel free to pick it up if you are
> > interested
> > > in
> > > > > contributing):
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-3902
> > > > >
> > > > > About case 3-c-1), what I meant is that since "predicate return
> true
> > on
> > > > > both",
> > > > > the resulted pair would just be the same as the original pair.
> > > > >
> > > > > About KIP-63, itself is a rather big story, but it has one
> > > correspondence
> > > > > to this JIRA: with caching you can dedup some records with the same
> > > key,
> > > > > for example in the input records to the KTable is:
> > > > >
> > > > > , , , , ,  ...
> > > > >
> > > > > And the KTable is materialized into a state store with cache on top
> > of
> > > > it,
> > > > > then the resulted downstream could be:
> > > > >
> > > > >  1}>,  6}> ...
> > > > >
> > > > > Instead of
> > > > >
> > > > >  1}>,  2}>,  3}>, ...  6}>
> ...
> > > > >
> > > > > So if it is piped to a filter() operator, then even less data will
> be
> > > > > produced.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome <
> phder...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Yes, it looks very good. Your detailed explanation appears
> > compelling
> > > > > > enough to reveal that some of the details of the complexity of a
> > > > streams
> > > > > > system are probably inherent complexity (not that I dared assume
> it
> > > was
> > > > > > "easy" but I could afford to be conveniently unaware). It took me
> > 30
> > > > > > minutes to grasp this latest response.
> > > > > >
> > > > > > There might be a typo in your email for case 3.c.1) as I would
> > think
> > > we
> > > > > > should send the most recent pair as opposed to original, in any
> > event
> > > > it
> > > > > > does not materially impact your presentation.
> > > > > >
> > > > > > Your case 3a) is really what triggered my line of questioning
> and I
> > > > found
> > > > > > the current behaviour vexing as it may lead to some undesirable
> and
> > > > > > necessary filter (see Michael G. Noll's fix in
> > > UserRegionLambdaExample
> > > > at
> > > > > > the very end trying to weed out null) used to output to topic to
> > > > console.
> > > > > > Without looking at design, it seemed self-evident to me that the
> > 3a)
> > > > > > behaviour had to be implemented ( from my point of view with the
> > code
> > > > > > example I was looking at, it simply means never say to delete a
> key
> > > > that
> > > > > > was never created, simply don't "create a deleted" key).
> > > > > >
> > > > > > Likewise cases 3 b,c look very reasonable.
> > > > > >
> > > > > > Just out of curiosity, did you effectively just restate the
> essence
> > > of
> > > > > > KIP-63 in a 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-27 Thread Guozhang Wang
My concern is that, the overhead of requesting the source KTable to be
materialized (i.e. creating a state store, and sending the {old -> new}
pair instead of the new value only) may be over-whelming compared with its
potential benefits of reducing the downstream traffic.

Guozhang

On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome  wrote:

> Guozhang,
>
> would you say it's advisable to initialize KTableFilter.sendOldValues to
> true instead of false? That's what I see that can trigger your described
> case 3 to potentially desirable effect, but I didn't include it into pull
> request. If left to default value of false, I don't know what mechanism
> should override it to true.
>
> Phil
>
> On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang 
> wrote:
>
> > Thanks! You can follow this step-by-step guidance to contribute to Kafka
> > via github.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
> >
> >
> > Guozhang
> >
> >
> > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome 
> > wrote:
> >
> > > I have a 1 liner solution for this in KTableFilter.java and about 5-6
> > lines
> > > changes to existing unit test KTableFilterTest.testSendingOldValue. I
> > > included those lines with context in the JIRA. I am struggling a bit
> with
> > > github being new to it and how to do a proper pull request so hopefully
> > > that can be followed up by you? I had the streams test suite pass aside
> > for
> > > a few cases that pertain specifically to this JIRA as assumptions have
> > now
> > > changed.
> > >
> > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang 
> > wrote:
> > >
> > > > Hi Philippe,
> > > >
> > > > Great, since you agree with my reasonings, I have created a JIRA
> ticket
> > > for
> > > > optimizing KTableFilter (feel free to pick it up if you are
> interested
> > in
> > > > contributing):
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-3902
> > > >
> > > > About case 3-c-1), what I meant is that since "predicate return true
> on
> > > > both",
> > > > the resulted pair would just be the same as the original pair.
> > > >
> > > > About KIP-63, itself is a rather big story, but it has one
> > correspondence
> > > > to this JIRA: with caching you can dedup some records with the same
> > key,
> > > > for example in the input records to the KTable is:
> > > >
> > > > , , , , ,  ...
> > > >
> > > > And the KTable is materialized into a state store with cache on top
> of
> > > it,
> > > > then the resulted downstream could be:
> > > >
> > > >  1}>,  6}> ...
> > > >
> > > > Instead of
> > > >
> > > >  1}>,  2}>,  3}>, ...  6}> ...
> > > >
> > > > So if it is piped to a filter() operator, then even less data will be
> > > > produced.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome  >
> > > > wrote:
> > > >
> > > > > Yes, it looks very good. Your detailed explanation appears
> compelling
> > > > > enough to reveal that some of the details of the complexity of a
> > > streams
> > > > > system are probably inherent complexity (not that I dared assume it
> > was
> > > > > "easy" but I could afford to be conveniently unaware). It took me
> 30
> > > > > minutes to grasp this latest response.
> > > > >
> > > > > There might be a typo in your email for case 3.c.1) as I would
> think
> > we
> > > > > should send the most recent pair as opposed to original, in any
> event
> > > it
> > > > > does not materially impact your presentation.
> > > > >
> > > > > Your case 3a) is really what triggered my line of questioning and I
> > > found
> > > > > the current behaviour vexing as it may lead to some undesirable and
> > > > > necessary filter (see Michael G. Noll's fix in
> > UserRegionLambdaExample
> > > at
> > > > > the very end trying to weed out null) used to output to topic to
> > > console.
> > > > > Without looking at design, it seemed self-evident to me that the
> 3a)
> > > > > behaviour had to be implemented ( from my point of view with the
> code
> > > > > example I was looking at, it simply means never say to delete a key
> > > that
> > > > > was never created, simply don't "create a deleted" key).
> > > > >
> > > > > Likewise cases 3 b,c look very reasonable.
> > > > >
> > > > > Just out of curiosity, did you effectively just restate the essence
> > of
> > > > > KIP-63 in a more approachable language I could understand or is
> > KIP-63
> > > > > really a different beast?
> > > > >
> > > > >
> > > > >
> > > > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang  >
> > > > wrote:
> > > > >
> > > > > > Hello Philippe,
> > > > > >
> > > > > > Very good points, let me dump my thoughts about "KTable.filter"
> > > > > > specifically and how we can improve on that:
> > > > > >
> > > > > > 1. Some context: when a KTable participates in a downstream
> > operators
> > > > 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-26 Thread Philippe Derome
Guozhang,

would you say it's advisable to initialize KTableFilter.sendOldValues to
true instead of false? That's what I see that can trigger your described
case 3 to potentially desirable effect, but I didn't include it into pull
request. If left to default value of false, I don't know what mechanism
should override it to true.

Phil

On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang  wrote:

> Thanks! You can follow this step-by-step guidance to contribute to Kafka
> via github.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
>
>
> Guozhang
>
>
> On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome 
> wrote:
>
> > I have a 1 liner solution for this in KTableFilter.java and about 5-6
> lines
> > changes to existing unit test KTableFilterTest.testSendingOldValue. I
> > included those lines with context in the JIRA. I am struggling a bit with
> > github being new to it and how to do a proper pull request so hopefully
> > that can be followed up by you? I had the streams test suite pass aside
> for
> > a few cases that pertain specifically to this JIRA as assumptions have
> now
> > changed.
> >
> > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang 
> wrote:
> >
> > > Hi Philippe,
> > >
> > > Great, since you agree with my reasonings, I have created a JIRA ticket
> > for
> > > optimizing KTableFilter (feel free to pick it up if you are interested
> in
> > > contributing):
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-3902
> > >
> > > About case 3-c-1), what I meant is that since "predicate return true on
> > > both",
> > > the resulted pair would just be the same as the original pair.
> > >
> > > About KIP-63, itself is a rather big story, but it has one
> correspondence
> > > to this JIRA: with caching you can dedup some records with the same
> key,
> > > for example in the input records to the KTable is:
> > >
> > > , , , , ,  ...
> > >
> > > And the KTable is materialized into a state store with cache on top of
> > it,
> > > then the resulted downstream could be:
> > >
> > >  1}>,  6}> ...
> > >
> > > Instead of
> > >
> > >  1}>,  2}>,  3}>, ...  6}> ...
> > >
> > > So if it is piped to a filter() operator, then even less data will be
> > > produced.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome 
> > > wrote:
> > >
> > > > Yes, it looks very good. Your detailed explanation appears compelling
> > > > enough to reveal that some of the details of the complexity of a
> > streams
> > > > system are probably inherent complexity (not that I dared assume it
> was
> > > > "easy" but I could afford to be conveniently unaware). It took me 30
> > > > minutes to grasp this latest response.
> > > >
> > > > There might be a typo in your email for case 3.c.1) as I would think
> we
> > > > should send the most recent pair as opposed to original, in any event
> > it
> > > > does not materially impact your presentation.
> > > >
> > > > Your case 3a) is really what triggered my line of questioning and I
> > found
> > > > the current behaviour vexing as it may lead to some undesirable and
> > > > necessary filter (see Michael G. Noll's fix in
> UserRegionLambdaExample
> > at
> > > > the very end trying to weed out null) used to output to topic to
> > console.
> > > > Without looking at design, it seemed self-evident to me that the 3a)
> > > > behaviour had to be implemented ( from my point of view with the code
> > > > example I was looking at, it simply means never say to delete a key
> > that
> > > > was never created, simply don't "create a deleted" key).
> > > >
> > > > Likewise cases 3 b,c look very reasonable.
> > > >
> > > > Just out of curiosity, did you effectively just restate the essence
> of
> > > > KIP-63 in a more approachable language I could understand or is
> KIP-63
> > > > really a different beast?
> > > >
> > > >
> > > >
> > > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > Hello Philippe,
> > > > >
> > > > > Very good points, let me dump my thoughts about "KTable.filter"
> > > > > specifically and how we can improve on that:
> > > > >
> > > > > 1. Some context: when a KTable participates in a downstream
> operators
> > > > (e.g.
> > > > > if that operator is an aggregation), then we need to materialize
> this
> > > > > KTable and send both its old value as well as new value as a pair
> > {old
> > > ->
> > > > > new} to the downstream operator. In practice it usually needs to
> send
> > > the
> > > > > pair.
> > > > >
> > > > > So let's discuss about them separately, take the following example
> > > source
> > > > > stream for your KTable
> > > > >
> > > > > , ,  ...
> > > > >
> > > > > When the KTable needs to be materialized, it will transform the
> > source
> > > > > messages into the pairs of:
> > > > >
> > > > >  1}>,  2}>,  3}>
> > > > >
> > > > > 2. If 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-25 Thread Guozhang Wang
Thanks! You can follow this step-by-step guidance to contribute to Kafka
via github.

https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest


Guozhang


On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome  wrote:

> I have a 1 liner solution for this in KTableFilter.java and about 5-6 lines
> changes to existing unit test KTableFilterTest.testSendingOldValue. I
> included those lines with context in the JIRA. I am struggling a bit with
> github being new to it and how to do a proper pull request so hopefully
> that can be followed up by you? I had the streams test suite pass aside for
> a few cases that pertain specifically to this JIRA as assumptions have now
> changed.
>
> On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang  wrote:
>
> > Hi Philippe,
> >
> > Great, since you agree with my reasonings, I have created a JIRA ticket
> for
> > optimizing KTableFilter (feel free to pick it up if you are interested in
> > contributing):
> >
> > https://issues.apache.org/jira/browse/KAFKA-3902
> >
> > About case 3-c-1), what I meant is that since "predicate return true on
> > both",
> > the resulted pair would just be the same as the original pair.
> >
> > About KIP-63, itself is a rather big story, but it has one correspondence
> > to this JIRA: with caching you can dedup some records with the same key,
> > for example in the input records to the KTable is:
> >
> > , , , , ,  ...
> >
> > And the KTable is materialized into a state store with cache on top of
> it,
> > then the resulted downstream could be:
> >
> >  1}>,  6}> ...
> >
> > Instead of
> >
> >  1}>,  2}>,  3}>, ...  6}> ...
> >
> > So if it is piped to a filter() operator, then even less data will be
> > produced.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome 
> > wrote:
> >
> > > Yes, it looks very good. Your detailed explanation appears compelling
> > > enough to reveal that some of the details of the complexity of a
> streams
> > > system are probably inherent complexity (not that I dared assume it was
> > > "easy" but I could afford to be conveniently unaware). It took me 30
> > > minutes to grasp this latest response.
> > >
> > > There might be a typo in your email for case 3.c.1) as I would think we
> > > should send the most recent pair as opposed to original, in any event
> it
> > > does not materially impact your presentation.
> > >
> > > Your case 3a) is really what triggered my line of questioning and I
> found
> > > the current behaviour vexing as it may lead to some undesirable and
> > > necessary filter (see Michael G. Noll's fix in UserRegionLambdaExample
> at
> > > the very end trying to weed out null) used to output to topic to
> console.
> > > Without looking at design, it seemed self-evident to me that the 3a)
> > > behaviour had to be implemented ( from my point of view with the code
> > > example I was looking at, it simply means never say to delete a key
> that
> > > was never created, simply don't "create a deleted" key).
> > >
> > > Likewise cases 3 b,c look very reasonable.
> > >
> > > Just out of curiosity, did you effectively just restate the essence of
> > > KIP-63 in a more approachable language I could understand or is KIP-63
> > > really a different beast?
> > >
> > >
> > >
> > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Hello Philippe,
> > > >
> > > > Very good points, let me dump my thoughts about "KTable.filter"
> > > > specifically and how we can improve on that:
> > > >
> > > > 1. Some context: when a KTable participates in a downstream operators
> > > (e.g.
> > > > if that operator is an aggregation), then we need to materialize this
> > > > KTable and send both its old value as well as new value as a pair
> {old
> > ->
> > > > new} to the downstream operator. In practice it usually needs to send
> > the
> > > > pair.
> > > >
> > > > So let's discuss about them separately, take the following example
> > source
> > > > stream for your KTable
> > > >
> > > > , ,  ...
> > > >
> > > > When the KTable needs to be materialized, it will transform the
> source
> > > > messages into the pairs of:
> > > >
> > > >  1}>,  2}>,  3}>
> > > >
> > > > 2. If "send old value" is not enabled, then when the filter predicate
> > > > returns false, we MUST send a  to the downstream operator
> to
> > > > indicate that this key is being filtered in the table. Otherwise, for
> > > > example if your filter is "value < 2", then the updated value 
> > will
> > > > just be filtered, resulting in incorrect semantics.
> > > >
> > > > If it returns true we should still send the original  to
> > > > downstream operators.
> > > >
> > > > 3. If "send old value" is enabled, then there are a couple of cases
> we
> > > can
> > > > consider:
> > > >
> > > > a. If old value is  and new value is ,
> > and
> > > > the filter predicate return false for the 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-25 Thread Philippe Derome
I have a 1 liner solution for this in KTableFilter.java and about 5-6 lines
changes to existing unit test KTableFilterTest.testSendingOldValue. I
included those lines with context in the JIRA. I am struggling a bit with
github being new to it and how to do a proper pull request so hopefully
that can be followed up by you? I had the streams test suite pass aside for
a few cases that pertain specifically to this JIRA as assumptions have now
changed.

On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang  wrote:

> Hi Philippe,
>
> Great, since you agree with my reasonings, I have created a JIRA ticket for
> optimizing KTableFilter (feel free to pick it up if you are interested in
> contributing):
>
> https://issues.apache.org/jira/browse/KAFKA-3902
>
> About case 3-c-1), what I meant is that since "predicate return true on
> both",
> the resulted pair would just be the same as the original pair.
>
> About KIP-63, itself is a rather big story, but it has one correspondence
> to this JIRA: with caching you can dedup some records with the same key,
> for example in the input records to the KTable is:
>
> , , , , ,  ...
>
> And the KTable is materialized into a state store with cache on top of it,
> then the resulted downstream could be:
>
>  1}>,  6}> ...
>
> Instead of
>
>  1}>,  2}>,  3}>, ...  6}> ...
>
> So if it is piped to a filter() operator, then even less data will be
> produced.
>
>
> Guozhang
>
>
> On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome 
> wrote:
>
> > Yes, it looks very good. Your detailed explanation appears compelling
> > enough to reveal that some of the details of the complexity of a streams
> > system are probably inherent complexity (not that I dared assume it was
> > "easy" but I could afford to be conveniently unaware). It took me 30
> > minutes to grasp this latest response.
> >
> > There might be a typo in your email for case 3.c.1) as I would think we
> > should send the most recent pair as opposed to original, in any event it
> > does not materially impact your presentation.
> >
> > Your case 3a) is really what triggered my line of questioning and I found
> > the current behaviour vexing as it may lead to some undesirable and
> > necessary filter (see Michael G. Noll's fix in UserRegionLambdaExample at
> > the very end trying to weed out null) used to output to topic to console.
> > Without looking at design, it seemed self-evident to me that the 3a)
> > behaviour had to be implemented ( from my point of view with the code
> > example I was looking at, it simply means never say to delete a key that
> > was never created, simply don't "create a deleted" key).
> >
> > Likewise cases 3 b,c look very reasonable.
> >
> > Just out of curiosity, did you effectively just restate the essence of
> > KIP-63 in a more approachable language I could understand or is KIP-63
> > really a different beast?
> >
> >
> >
> > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang 
> wrote:
> >
> > > Hello Philippe,
> > >
> > > Very good points, let me dump my thoughts about "KTable.filter"
> > > specifically and how we can improve on that:
> > >
> > > 1. Some context: when a KTable participates in a downstream operators
> > (e.g.
> > > if that operator is an aggregation), then we need to materialize this
> > > KTable and send both its old value as well as new value as a pair {old
> ->
> > > new} to the downstream operator. In practice it usually needs to send
> the
> > > pair.
> > >
> > > So let's discuss about them separately, take the following example
> source
> > > stream for your KTable
> > >
> > > , ,  ...
> > >
> > > When the KTable needs to be materialized, it will transform the source
> > > messages into the pairs of:
> > >
> > >  1}>,  2}>,  3}>
> > >
> > > 2. If "send old value" is not enabled, then when the filter predicate
> > > returns false, we MUST send a  to the downstream operator to
> > > indicate that this key is being filtered in the table. Otherwise, for
> > > example if your filter is "value < 2", then the updated value 
> will
> > > just be filtered, resulting in incorrect semantics.
> > >
> > > If it returns true we should still send the original  to
> > > downstream operators.
> > >
> > > 3. If "send old value" is enabled, then there are a couple of cases we
> > can
> > > consider:
> > >
> > > a. If old value is  and new value is ,
> and
> > > the filter predicate return false for the new value, then in this case
> it
> > > is safe to optimize and not returning anything to the downstream
> > operator,
> > > since in this case we know there is no value for the key previously
> > > anyways; otherwise we send the original pair.
> > >
> > > b. If old value is  and new value is ,
> > > indicating to delete this key, and the filter predicate return false
> for
> > > the old value, then in this case it is safe to optimize and not
> returning
> > > anything to the downstream operator, since we know that the old value
> has
> > > 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-24 Thread Guozhang Wang
Hi Philippe,

Great, since you agree with my reasonings, I have created a JIRA ticket for
optimizing KTableFilter (feel free to pick it up if you are interested in
contributing):

https://issues.apache.org/jira/browse/KAFKA-3902

About case 3-c-1), what I meant is that since "predicate return true on both",
the resulted pair would just be the same as the original pair.

About KIP-63, itself is a rather big story, but it has one correspondence
to this JIRA: with caching you can dedup some records with the same key,
for example in the input records to the KTable is:

, , , , ,  ...

And the KTable is materialized into a state store with cache on top of it,
then the resulted downstream could be:

 1}>,  6}> ...

Instead of

 1}>,  2}>,  3}>, ...  6}> ...

So if it is piped to a filter() operator, then even less data will be
produced.


Guozhang


On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome  wrote:

> Yes, it looks very good. Your detailed explanation appears compelling
> enough to reveal that some of the details of the complexity of a streams
> system are probably inherent complexity (not that I dared assume it was
> "easy" but I could afford to be conveniently unaware). It took me 30
> minutes to grasp this latest response.
>
> There might be a typo in your email for case 3.c.1) as I would think we
> should send the most recent pair as opposed to original, in any event it
> does not materially impact your presentation.
>
> Your case 3a) is really what triggered my line of questioning and I found
> the current behaviour vexing as it may lead to some undesirable and
> necessary filter (see Michael G. Noll's fix in UserRegionLambdaExample at
> the very end trying to weed out null) used to output to topic to console.
> Without looking at design, it seemed self-evident to me that the 3a)
> behaviour had to be implemented ( from my point of view with the code
> example I was looking at, it simply means never say to delete a key that
> was never created, simply don't "create a deleted" key).
>
> Likewise cases 3 b,c look very reasonable.
>
> Just out of curiosity, did you effectively just restate the essence of
> KIP-63 in a more approachable language I could understand or is KIP-63
> really a different beast?
>
>
>
> On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang  wrote:
>
> > Hello Philippe,
> >
> > Very good points, let me dump my thoughts about "KTable.filter"
> > specifically and how we can improve on that:
> >
> > 1. Some context: when a KTable participates in a downstream operators
> (e.g.
> > if that operator is an aggregation), then we need to materialize this
> > KTable and send both its old value as well as new value as a pair {old ->
> > new} to the downstream operator. In practice it usually needs to send the
> > pair.
> >
> > So let's discuss about them separately, take the following example source
> > stream for your KTable
> >
> > , ,  ...
> >
> > When the KTable needs to be materialized, it will transform the source
> > messages into the pairs of:
> >
> >  1}>,  2}>,  3}>
> >
> > 2. If "send old value" is not enabled, then when the filter predicate
> > returns false, we MUST send a  to the downstream operator to
> > indicate that this key is being filtered in the table. Otherwise, for
> > example if your filter is "value < 2", then the updated value  will
> > just be filtered, resulting in incorrect semantics.
> >
> > If it returns true we should still send the original  to
> > downstream operators.
> >
> > 3. If "send old value" is enabled, then there are a couple of cases we
> can
> > consider:
> >
> > a. If old value is  and new value is , and
> > the filter predicate return false for the new value, then in this case it
> > is safe to optimize and not returning anything to the downstream
> operator,
> > since in this case we know there is no value for the key previously
> > anyways; otherwise we send the original pair.
> >
> > b. If old value is  and new value is ,
> > indicating to delete this key, and the filter predicate return false for
> > the old value, then in this case it is safe to optimize and not returning
> > anything to the downstream operator, since we know that the old value has
> > already been filtered in a previous message; otherwise we send the
> original
> > pair.
> >
> > c. If both old and new values are not null, and:
> >
> >
> >   1) predicate return true on both, send the original pair;
> >
> >   2) predicate return false on both, we can optimize and do not send
> > anything;
> >
> >   3) predicate return true on old and false on new, send the key: {old ->
> > null};
> >
> >   4) predicate return false on old and true on new, send the key: {null
> ->
> > new};
> >
> > Does this sounds good to you?
> >
> >
> > Guozhang
> >
> >
> > On Thu, Jun 23, 2016 at 6:17 PM, Philippe Derome 
> > wrote:
> >
> > > Thanks a lot for the detailed feedback, its clarity and the reference
> to
> > > KIP-63, which 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-24 Thread Philippe Derome
Yes, it looks very good. Your detailed explanation appears compelling
enough to reveal that some of the details of the complexity of a streams
system are probably inherent complexity (not that I dared assume it was
"easy" but I could afford to be conveniently unaware). It took me 30
minutes to grasp this latest response.

There might be a typo in your email for case 3.c.1) as I would think we
should send the most recent pair as opposed to original, in any event it
does not materially impact your presentation.

Your case 3a) is really what triggered my line of questioning and I found
the current behaviour vexing as it may lead to some undesirable and
necessary filter (see Michael G. Noll's fix in UserRegionLambdaExample at
the very end trying to weed out null) used to output to topic to console.
Without looking at design, it seemed self-evident to me that the 3a)
behaviour had to be implemented ( from my point of view with the code
example I was looking at, it simply means never say to delete a key that
was never created, simply don't "create a deleted" key).

Likewise cases 3 b,c look very reasonable.

Just out of curiosity, did you effectively just restate the essence of
KIP-63 in a more approachable language I could understand or is KIP-63
really a different beast?



On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang  wrote:

> Hello Philippe,
>
> Very good points, let me dump my thoughts about "KTable.filter"
> specifically and how we can improve on that:
>
> 1. Some context: when a KTable participates in a downstream operators (e.g.
> if that operator is an aggregation), then we need to materialize this
> KTable and send both its old value as well as new value as a pair {old ->
> new} to the downstream operator. In practice it usually needs to send the
> pair.
>
> So let's discuss about them separately, take the following example source
> stream for your KTable
>
> , ,  ...
>
> When the KTable needs to be materialized, it will transform the source
> messages into the pairs of:
>
>  1}>,  2}>,  3}>
>
> 2. If "send old value" is not enabled, then when the filter predicate
> returns false, we MUST send a  to the downstream operator to
> indicate that this key is being filtered in the table. Otherwise, for
> example if your filter is "value < 2", then the updated value  will
> just be filtered, resulting in incorrect semantics.
>
> If it returns true we should still send the original  to
> downstream operators.
>
> 3. If "send old value" is enabled, then there are a couple of cases we can
> consider:
>
> a. If old value is  and new value is , and
> the filter predicate return false for the new value, then in this case it
> is safe to optimize and not returning anything to the downstream operator,
> since in this case we know there is no value for the key previously
> anyways; otherwise we send the original pair.
>
> b. If old value is  and new value is ,
> indicating to delete this key, and the filter predicate return false for
> the old value, then in this case it is safe to optimize and not returning
> anything to the downstream operator, since we know that the old value has
> already been filtered in a previous message; otherwise we send the original
> pair.
>
> c. If both old and new values are not null, and:
>
>
>   1) predicate return true on both, send the original pair;
>
>   2) predicate return false on both, we can optimize and do not send
> anything;
>
>   3) predicate return true on old and false on new, send the key: {old ->
> null};
>
>   4) predicate return false on old and true on new, send the key: {null ->
> new};
>
> Does this sounds good to you?
>
>
> Guozhang
>
>
> On Thu, Jun 23, 2016 at 6:17 PM, Philippe Derome 
> wrote:
>
> > Thanks a lot for the detailed feedback, its clarity and the reference to
> > KIP-63, which however is for the most part above my head for now.
> >
> > Having said that, I still hold the view that the behaviour I presented is
> > undesirable and hardly defensible and we may have no choice but to agree
> to
> > disagree and it could be a sterile discussion to keep at it and
> addressing
> > KIP-63 and other issues are more important than my brief observation.
> >
> > What follows supports my point of view that the filter method is not
> > behaving as expected and I'd still think it's a defect, however I am
> > guarded with my observation admitting my status of "total newbie" at
> stream
> > processing and Kafka.
> >
> > if we rewrite the code snippet I provided from
> > KTable regionCounts = userRegions
> >  .groupBy((userId, region) -> KeyValue.pair(region, region))
> >  .count("CountsByRegion")
> >  .filter((regionName, count) -> false)
> >  .mapValues(count -> count.toString());
> >
> > to
> >
> >
> > KTable regionCounts1 = userRegions
> > .groupBy((userId, region) -> KeyValue.pair(region, region))
> > .count("CountsByRegion");
> >
> > KTable

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-24 Thread Guozhang Wang
Hello Philippe,

Very good points, let me dump my thoughts about "KTable.filter"
specifically and how we can improve on that:

1. Some context: when a KTable participates in a downstream operators (e.g.
if that operator is an aggregation), then we need to materialize this
KTable and send both its old value as well as new value as a pair {old ->
new} to the downstream operator. In practice it usually needs to send the
pair.

So let's discuss about them separately, take the following example source
stream for your KTable

, ,  ...

When the KTable needs to be materialized, it will transform the source
messages into the pairs of:

 1}>,  2}>,  3}>

2. If "send old value" is not enabled, then when the filter predicate
returns false, we MUST send a  to the downstream operator to
indicate that this key is being filtered in the table. Otherwise, for
example if your filter is "value < 2", then the updated value  will
just be filtered, resulting in incorrect semantics.

If it returns true we should still send the original  to
downstream operators.

3. If "send old value" is enabled, then there are a couple of cases we can
consider:

a. If old value is  and new value is , and
the filter predicate return false for the new value, then in this case it
is safe to optimize and not returning anything to the downstream operator,
since in this case we know there is no value for the key previously
anyways; otherwise we send the original pair.

b. If old value is  and new value is ,
indicating to delete this key, and the filter predicate return false for
the old value, then in this case it is safe to optimize and not returning
anything to the downstream operator, since we know that the old value has
already been filtered in a previous message; otherwise we send the original
pair.

c. If both old and new values are not null, and:


  1) predicate return true on both, send the original pair;

  2) predicate return false on both, we can optimize and do not send
anything;

  3) predicate return true on old and false on new, send the key: {old ->
null};

  4) predicate return false on old and true on new, send the key: {null ->
new};

Does this sounds good to you?


Guozhang


On Thu, Jun 23, 2016 at 6:17 PM, Philippe Derome  wrote:

> Thanks a lot for the detailed feedback, its clarity and the reference to
> KIP-63, which however is for the most part above my head for now.
>
> Having said that, I still hold the view that the behaviour I presented is
> undesirable and hardly defensible and we may have no choice but to agree to
> disagree and it could be a sterile discussion to keep at it and addressing
> KIP-63 and other issues are more important than my brief observation.
>
> What follows supports my point of view that the filter method is not
> behaving as expected and I'd still think it's a defect, however I am
> guarded with my observation admitting my status of "total newbie" at stream
> processing and Kafka.
>
> if we rewrite the code snippet I provided from
> KTable regionCounts = userRegions
>  .groupBy((userId, region) -> KeyValue.pair(region, region))
>  .count("CountsByRegion")
>  .filter((regionName, count) -> false)
>  .mapValues(count -> count.toString());
>
> to
>
>
> KTable regionCounts1 = userRegions
> .groupBy((userId, region) -> KeyValue.pair(region, region))
> .count("CountsByRegion");
>
> KTable regionCounts = regionCounts1
> .filter((regionName, count) -> false)
> .mapValues(count -> count.toString());
>
>
> It becomes clear that regionCounts1 could build up plenty of keys with
> valid Long counts, normal behaviour
>
>  (I think you call this a node in the topology in KIP-63 and
> regionCounts is a successor node).
>
> These regionCounts1 keys are then exposed to evaluation of KTable
> regionCounts as an input. But why should there be any key created in
> KTable regionCounts that has a false filter? In other words, the
> "optimization"
>
> seems really compelling here: do not create a key before that key
> becomes relevant. The key with a null value is valid and relevant in
> regionCounts1 but not regionCounts. By a programming composition
> argument, the original block
>
> of code I presented should be equivalent to the broken down one in two
> blocks here (and I guess that's saying 1 unified node in the topology
> should be equivalent to a chain of 2 nodes represented below if I
> understand the terminology right).
>
> The contents of regionCounts should not change depending on the set of
> keys present in regionCounts1 if we view this
>
> from a functional programming point of view (it's as if we are
> carrying garbage collected objects into regionCounts), which seems
> natural considering the method filter that is pervasive in FP.
>
> Here regionCounts is totally oblivious that aggregation took place
> previously in regionCounts1 and that's fine (KIP-63 talks much about
> aggregation but I 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-23 Thread Philippe Derome
Thanks a lot for the detailed feedback, its clarity and the reference to
KIP-63, which however is for the most part above my head for now.

Having said that, I still hold the view that the behaviour I presented is
undesirable and hardly defensible and we may have no choice but to agree to
disagree and it could be a sterile discussion to keep at it and addressing
KIP-63 and other issues are more important than my brief observation.

What follows supports my point of view that the filter method is not
behaving as expected and I'd still think it's a defect, however I am
guarded with my observation admitting my status of "total newbie" at stream
processing and Kafka.

if we rewrite the code snippet I provided from
KTable regionCounts = userRegions
 .groupBy((userId, region) -> KeyValue.pair(region, region))
 .count("CountsByRegion")
 .filter((regionName, count) -> false)
 .mapValues(count -> count.toString());

to


KTable regionCounts1 = userRegions
.groupBy((userId, region) -> KeyValue.pair(region, region))
.count("CountsByRegion");

KTable regionCounts = regionCounts1
.filter((regionName, count) -> false)
.mapValues(count -> count.toString());


It becomes clear that regionCounts1 could build up plenty of keys with
valid Long counts, normal behaviour

 (I think you call this a node in the topology in KIP-63 and
regionCounts is a successor node).

These regionCounts1 keys are then exposed to evaluation of KTable
regionCounts as an input. But why should there be any key created in
KTable regionCounts that has a false filter? In other words, the
"optimization"

seems really compelling here: do not create a key before that key
becomes relevant. The key with a null value is valid and relevant in
regionCounts1 but not regionCounts. By a programming composition
argument, the original block

of code I presented should be equivalent to the broken down one in two
blocks here (and I guess that's saying 1 unified node in the topology
should be equivalent to a chain of 2 nodes represented below if I
understand the terminology right).

The contents of regionCounts should not change depending on the set of
keys present in regionCounts1 if we view this

from a functional programming point of view (it's as if we are
carrying garbage collected objects into regionCounts), which seems
natural considering the method filter that is pervasive in FP.

Here regionCounts is totally oblivious that aggregation took place
previously in regionCounts1 and that's fine (KIP-63 talks much about
aggregation but I don't really care about, I care about the 2nd node
and the behaviour of filter).


On Thu, Jun 23, 2016 at 6:13 PM, Guozhang Wang  wrote:

> Hello Philippe,
>
> I think your question is really in two-folds:
>
> 1. What is the semantic difference between a KTable and a KStream, and more
> specifically how should we interpret (key, null) in KTable?
>
> You can find some explanations in this documentation:
>
> http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream
>
> Note that KTable itself is still a stream behind the scene, although it may
> be materialized when necessary. And specifically to your question, (key,
> null) can be treated as a tombstone on the specified key, and when this
> KTable stream is materialized, it will result in a "delete" on materialized
> view.
>
>
> 2. As for the "filter" operator, yes it will generate a large amount of
> (key, null) records which indicates "delete" in the resulted KTable, and
> hence large traffic to the piped topic. But we are working on KIP-63 which
> unifies the caching mechanism in the `KTable.to` operator as well so that
> de-duping can be done in this operator and hence the outgoing traffic can
> be largely reduced:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
>
>
> Guozhang
>
>
> On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome 
> wrote:
>
> > I made a modification of latest Confluent's example
> > UserRegionLambdaExample. See relevant code at end of email.
> >
> > Am I correct in understanding that KTable semantics should be similar to
> a
> > store-backed cache of a view as (per wikipedia on materialized views) or
> > similar to Oracle's materialized views and indexed views? More
> > specifically, I am looking at when a (key, null value) pair can make it
> > into KTable on generating table from a valid KStream with a false filter.
> >
> > Here's relevant code modified from example for which I observed that all
> > keys within userRegions are sent out to topic LargeRegions with a null
> > value. I would think that both regionCounts KTable and topic LargeRegions
> > should be empty so that the cached view agrees with the intended query (a
> > query with an intentional empty result set as the filter is intentionally
> > false as 1 >= 2).
> >
> > I am not sure I understand 

Re: KTable.filter usage, memory consumption and materialized view semantics

2016-06-23 Thread Guozhang Wang
Hello Philippe,

I think your question is really in two-folds:

1. What is the semantic difference between a KTable and a KStream, and more
specifically how should we interpret (key, null) in KTable?

You can find some explanations in this documentation:
http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream

Note that KTable itself is still a stream behind the scene, although it may
be materialized when necessary. And specifically to your question, (key,
null) can be treated as a tombstone on the specified key, and when this
KTable stream is materialized, it will result in a "delete" on materialized
view.


2. As for the "filter" operator, yes it will generate a large amount of
(key, null) records which indicates "delete" in the resulted KTable, and
hence large traffic to the piped topic. But we are working on KIP-63 which
unifies the caching mechanism in the `KTable.to` operator as well so that
de-duping can be done in this operator and hence the outgoing traffic can
be largely reduced:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams


Guozhang


On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome  wrote:

> I made a modification of latest Confluent's example
> UserRegionLambdaExample. See relevant code at end of email.
>
> Am I correct in understanding that KTable semantics should be similar to a
> store-backed cache of a view as (per wikipedia on materialized views) or
> similar to Oracle's materialized views and indexed views? More
> specifically, I am looking at when a (key, null value) pair can make it
> into KTable on generating table from a valid KStream with a false filter.
>
> Here's relevant code modified from example for which I observed that all
> keys within userRegions are sent out to topic LargeRegions with a null
> value. I would think that both regionCounts KTable and topic LargeRegions
> should be empty so that the cached view agrees with the intended query (a
> query with an intentional empty result set as the filter is intentionally
> false as 1 >= 2).
>
> I am not sure I understand implications properly as I am new but it seems
> possible that  a highly selective filter from a large incoming stream would
> result in high memory usage for regionCounts and hence the stream
> application.
>
> KTable regionCounts = userRegions
> // Count by region
> // We do not need to specify any explicit serdes because the key
> and value types do not change
> .groupBy((userId, region) -> KeyValue.pair(region, region))
> .count("CountsByRegion")
> // discard any regions FOR SAKE OF EXAMPLE
> .filter((regionName, count) -> *1 >= 2*)
> .mapValues(count -> count.toString());
>
>
> KStream regionCountsForConsole = regionCounts.toStream();
>
> regionCountsForConsole.to(stringSerde, *stringSerde*, "LargeRegions");
>



-- 
-- Guozhang


KTable.filter usage, memory consumption and materialized view semantics

2016-06-23 Thread Philippe Derome
I made a modification of latest Confluent's example
UserRegionLambdaExample. See relevant code at end of email.

Am I correct in understanding that KTable semantics should be similar to a
store-backed cache of a view as (per wikipedia on materialized views) or
similar to Oracle's materialized views and indexed views? More
specifically, I am looking at when a (key, null value) pair can make it
into KTable on generating table from a valid KStream with a false filter.

Here's relevant code modified from example for which I observed that all
keys within userRegions are sent out to topic LargeRegions with a null
value. I would think that both regionCounts KTable and topic LargeRegions
should be empty so that the cached view agrees with the intended query (a
query with an intentional empty result set as the filter is intentionally
false as 1 >= 2).

I am not sure I understand implications properly as I am new but it seems
possible that  a highly selective filter from a large incoming stream would
result in high memory usage for regionCounts and hence the stream
application.

KTable regionCounts = userRegions
// Count by region
// We do not need to specify any explicit serdes because the key
and value types do not change
.groupBy((userId, region) -> KeyValue.pair(region, region))
.count("CountsByRegion")
// discard any regions FOR SAKE OF EXAMPLE
.filter((regionName, count) -> *1 >= 2*)
.mapValues(count -> count.toString());


KStream regionCountsForConsole = regionCounts.toStream();

regionCountsForConsole.to(stringSerde, *stringSerde*, "LargeRegions");