Re: [DISCUSS] KIP-606: Add Metadata Context to MetricsReporter

2020-05-22 Thread Thomas Becker
This looks useful, I think the only nit I would pick would be to name the 
MetricsReporter method contextChanged (past tense), which seems more 
conventional for methods like this.


On Tue, 2020-05-05 at 16:58 -0700, Xavier Léauté wrote:

[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.





Hi Everyone,


I've published a KIP to address some shortcoming of our current metrics

reporter interface. Would appreciate feedback.


https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-606%253A%2BAdd%2BMetadata%2BContext%2Bto%2BMetricsReporterdata=02%7C01%7CThomas.Becker%40tivo.com%7C0504b45e4eb648514a7b08d7f1503ce5%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C1%7C637243199236640045sdata=ILYVMK6e%2BeirHq0ocz2f97x%2FF9yL5mHRNr8XMe7J7nc%3Dreserved=0


Thank you,

Xavier


--
[cid:d3a26b7d3693657e816e0ddd2739d3d3b0257f01.camel@tivo.com] Tommy Becker
Principal Engineer
Personalized Content Discovery
O +1 919.460.4747
tivo.com



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.


Re: [DISCUSS] KIP-566: Add rebalance callbacks to ConsumerInterceptor

2020-05-05 Thread Thomas Becker
Bumping to get and get some attention on this KIP before initiating a vote. 
Using ConsumerInterceptor for its intended purpose quite difficult without this.


On Mon, 2020-02-10 at 15:50 +, Thomas Becker wrote:
[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.

Bumping this again for visibility. If no one has any comments, maybe I'll just 
start the VOTE thread?


On Wed, 2020-01-29 at 22:24 +, Thomas Becker wrote:

[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.





Bumping this, hoping to get some additional feedback.



From: M. Manna mailto:manme...@gmail.com>>

Sent: Thursday, January 23, 2020 4:37 PM

To: dev@kafka.apache.org<mailto:dev@kafka.apache.org> 
mailto:dev@kafka.apache.org>>

Subject: Re: [DISCUSS] KIP-566: Add rebalance callbacks to ConsumerInterceptor


[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.





Hey Thomas,


On Thu, 23 Jan 2020 at 21:17, Thomas Becker 
mailto:thomas.bec...@tivo.com>> wrote:


Hi folks,

I'd like to open the discussion for KIP-566: Add rebalance callbacks to

ConsumerInterceptor. We've been looking to implement some custom metrics

via ConsumerInterceptor, and not knowing when partition ownership changes

is a significant impediment. I'd appreciate your thoughts.



https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-566%253A%2BAdd%2Brebalance%2Bcallbacks%2Bto%2BConsumerInterceptordata=02%7C01%7CThomas.Becker%40tivo.com%7C660a1b4a8ba94d668c7708d7a50a10d3%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C0%7C637159334967045569sdata=RYUcrnvDWady9%2FAnLSvc2vvLlchIbj6w0og8Vxe1KN4%3Dreserved=0<https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-566%253A%2BAdd%2Brebalance%2Bcallbacks%2Bto%2BConsumerInterceptor=02%7C01%7CThomas.Becker%40tivo.com%7C4f1be50e5e1140dee63808d7ae40f935%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C0%7C637169466398399057=M%2BzYH0%2FGii4ES8QqqEpZqN7n4YVinXvxjcIrAkuOi2o%3D=0>




 I had a quick read through the KIP. I don't see any obvious issues.

Sounds like a simple improvement. Perhaps, you could add your thoughts

about RebalanceListener API in the future e.g. when to unify the

functionality. If implemented, we can simply use one implementation for

both things.


I would be interested to hear others' comments about this.


Thanks,







This email and any attachments may contain confidential and privileged

material for the sole use of the intended recipient. Any review, copying,

or distribution of this email (or any attachments) by others is prohibited.

If you are not the intended recipient, please contact the sender

immediately and permanently delete this email and any attachments. No

employee or agent of TiVo is authorized to conclude any binding agreement

on behalf of TiVo by email. Binding agreements with TiVo may only be made

by a signed written agreement.






This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.


--
[cid:9c5eb4be88341189d58cf171e402c865e96c80a8.camel@tivo.com] Tommy Becker
Principal Engineer
Personalized Content Discovery
O +1 919.460.4747
tivo.com<http://www.tivo.com/>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.

--
[cid:ab14b394da4ece62fa660a6e6637a4a33ad36ba4.camel@tivo.com] Tommy Becker
Principal Engineer
Personalized Content Discovery
O +1 919.460.4747
tivo.com<http://www.tivo.com/>



This email and any attachments may contain confidential and privileged material 
for the sole use of t

Re: [DISCUSS] KIP-566: Add rebalance callbacks to ConsumerInterceptor

2020-02-10 Thread Thomas Becker
Bumping this again for visibility. If no one has any comments, maybe I'll just 
start the VOTE thread?


On Wed, 2020-01-29 at 22:24 +, Thomas Becker wrote:

[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.





Bumping this, hoping to get some additional feedback.



From: M. Manna mailto:manme...@gmail.com>>

Sent: Thursday, January 23, 2020 4:37 PM

To: dev@kafka.apache.org<mailto:dev@kafka.apache.org> 
mailto:dev@kafka.apache.org>>

Subject: Re: [DISCUSS] KIP-566: Add rebalance callbacks to ConsumerInterceptor


[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.





Hey Thomas,


On Thu, 23 Jan 2020 at 21:17, Thomas Becker 
mailto:thomas.bec...@tivo.com>> wrote:


Hi folks,

I'd like to open the discussion for KIP-566: Add rebalance callbacks to

ConsumerInterceptor. We've been looking to implement some custom metrics

via ConsumerInterceptor, and not knowing when partition ownership changes

is a significant impediment. I'd appreciate your thoughts.



https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-566%253A%2BAdd%2Brebalance%2Bcallbacks%2Bto%2BConsumerInterceptordata=02%7C01%7CThomas.Becker%40tivo.com%7C660a1b4a8ba94d668c7708d7a50a10d3%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C0%7C637159334967045569sdata=RYUcrnvDWady9%2FAnLSvc2vvLlchIbj6w0og8Vxe1KN4%3Dreserved=0




 I had a quick read through the KIP. I don't see any obvious issues.

Sounds like a simple improvement. Perhaps, you could add your thoughts

about RebalanceListener API in the future e.g. when to unify the

functionality. If implemented, we can simply use one implementation for

both things.


I would be interested to hear others' comments about this.


Thanks,







This email and any attachments may contain confidential and privileged

material for the sole use of the intended recipient. Any review, copying,

or distribution of this email (or any attachments) by others is prohibited.

If you are not the intended recipient, please contact the sender

immediately and permanently delete this email and any attachments. No

employee or agent of TiVo is authorized to conclude any binding agreement

on behalf of TiVo by email. Binding agreements with TiVo may only be made

by a signed written agreement.






This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.


--
[cid:9c5eb4be88341189d58cf171e402c865e96c80a8.camel@tivo.com] Tommy Becker
Principal Engineer
Personalized Content Discovery
O +1 919.460.4747
tivo.com<http://www.tivo.com/>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.


Re: [KAFKA-557] Add emit on change support for Kafka Streams

2020-02-04 Thread Thomas Becker
stered via config. You could say that we make

ChangeDetector an optional parameter to every operation in Streams, but this

seems to carry quite a bit of mental burden with it. People will wonder what

it's for and whether or not they should be using it. There would almost

certainly be a misconception that it's preferable to implement it always, which

would be unfortunate. Plus, to actually implment metadata flowing through the

topology as in your use case, you'd have to do two things:

1. make sure that all operations actually preserve the metadata alongside the

data (e.g., don't accidentally add a mapValues like I did, or you drop the

metadata).

2. implement a ChangeDetector for every single operation in the topology, or you

don't get the benefit of dropping non-changes internally

2b. Alternatively, you could just add the ChangeDetector to one operation toward

the end of the topology. This would not drop redundant computation internally,

but only drop redundant _outputs_. But this is just about the same as your

current solution.


I definitely see your point regarding configuration. I was originally thinking 
about this when the deduplication was going to be opt-in, and it seemed very 
natural to say something like:

employeeInfo.join(employeePayroll, (info, payroll) -> new Result(info.name(), 
payroll.salary()))
.suppress(duplicatesAccordingTo(someChangeDetector))

Alternatively you can imagine a similar method being on Materialized, though 
obviously this makes less sense if we don't want to require materialization. If 
we're now talking about changing the default behavior and not having any 
configuration options, it's harder to find a place for this.



A final thought; if it really is a metadata question, can we just plan to finish

up the support for headers in Streams? I.e., give you a way to control the way

that headers flow through the topology? Then, we could treat headers the same

way we treat timestamps in the no-op checking... We completely ignore them for

the sake of comparison. Thus, neither the timestamp nor the headers would get

updated in internal state or in downstream views as long as the value itself

doesn't change. This seems to give us a way to support your use case without

adding to the mental overhead of using Streams for simple things.

Agree headers could be a decent fit for this particular case because it's 
mostly metadata, though to be honest we haven't looked at headers much (mostly 
because, and to your point, support seems to be lacking). I feel like there 
would be other cases where this feature could be valuable, but I admit I can't 
come up with anything right this second. Perhaps yuzhihong had an example in 
mind?


I.e., simple things should be easy, and complex things should be possible.

What are your thoughts?
Thanks,
-John


On Mon, Feb 3, 2020, at 07:19, Thomas Becker wrote:

Hi John,
Can you describe how you'd use filtering/mapping to deduplicate
records? To give some background on my suggestion we currently have a
small stream processor that exists solely to deduplicate, which we do
using a process that I assume would be similar to what would be done
here (with a store of keys and hash values). But the records we are
deduplicating have some metadata fields (such as timestamps of when the
record was posted) that we don't consider semantically meaningful for
downstream consumers, and therefore we also suppress updates that only
touch those fields.

-Tommy


On Fri, 2020-01-31 at 19:30 -0600, John Roesler wrote:
[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them. 


Hi Thomas and yuzhihong,
That’s an interesting idea. Can you help think of a use case that isn’t also 
served by filtering or mapping beforehand?
Thanks for helping to design this feature! -John
On Fri, Jan 31, 2020, at 18:56, yuzhih...@gmail.com<mailto:yuzhih...@gmail.com> 
wrote: I think this is good idea.
On Jan 31, 2020, at 4:49 PM, Thomas Becker 
mailto:thomas.bec...@tivo.com>> wrote:
How do folks feel about allowing the mechanism by which no-ops are detected to 
be pluggable? Meaning use something like a hash by default, but you could 
optionally provide an implementation of something to use instead, like a 
ChangeDetector. This could be useful for example to ignore changes to certain 
fields, which may not be relevant to the operation being performed. 
 From: John Roesler 
mailto:vvcep...@apache.org>> Sent: Friday, January 31, 
2020 4:51 PM To: dev@kafka.apache.org<mailto:dev@kafka.apache.org> 
mailto:dev@kafka.apache.org>> Subject: Re: [KAFKA-557] 
Add emit on change support for Kafka Streams
[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them. 


Hello all,
Sorry for my silence. It seems like we 

Re: [KAFKA-557] Add emit on change support for Kafka Streams

2020-02-03 Thread Thomas Becker
Hi John,
Can you describe how you'd use filtering/mapping to deduplicate records? To 
give some background on my suggestion we currently have a small stream 
processor that exists solely to deduplicate, which we do using a process that I 
assume would be similar to what would be done here (with a store of keys and 
hash values). But the records we are deduplicating have some metadata fields 
(such as timestamps of when the record was posted) that we don't consider 
semantically meaningful for downstream consumers, and therefore we also 
suppress updates that only touch those fields.

-Tommy


On Fri, 2020-01-31 at 19:30 -0600, John Roesler wrote:

[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.





Hi Thomas and yuzhihong,


That’s an interesting idea. Can you help think of a use case that isn’t also 
served by filtering or mapping beforehand?


Thanks for helping to design this feature!

-John


On Fri, Jan 31, 2020, at 18:56, yuzhih...@gmail.com<mailto:yuzhih...@gmail.com> 
wrote:

I think this is good idea.


On Jan 31, 2020, at 4:49 PM, Thomas Becker 
mailto:thomas.bec...@tivo.com>> wrote:


How do folks feel about allowing the mechanism by which no-ops are detected to 
be pluggable? Meaning use something like a hash by default, but you could 
optionally provide an implementation of something to use instead, like a 
ChangeDetector. This could be useful for example to ignore changes to certain 
fields, which may not be relevant to the operation being performed.



From: John Roesler mailto:vvcep...@apache.org>>

Sent: Friday, January 31, 2020 4:51 PM

To: dev@kafka.apache.org<mailto:dev@kafka.apache.org> 
mailto:dev@kafka.apache.org>>

Subject: Re: [KAFKA-557] Add emit on change support for Kafka Streams


[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.





Hello all,


Sorry for my silence. It seems like we are getting close to consensus.

Hopefully, we could move to a vote soon!


All of the reasoning from Matthias and Bruno around timestamp is compelling. I

would be strongly in favor of stating a few things very clearly in the KIP:

1. Streams will drop no-op updates only for KTable operations.


  That is, we won't make any changes to KStream aggregations at the moment. It

  does seem like we can potentially revisit the time semantics of that operation

  in the future, but we don't need to do it now.


  On the other hand, the proposed semantics for KTable timestamps (marking the

  beginning of the validity of that record) makes sense to me.


2. Streams will only drop no-op updates for _stateful_ KTable operations.


  We don't want to add a hard guarantee that Streams will _never_ emit a no-op

  table update because it would require adding state to otherwise stateless

  operations. If someone is really concerned about a particular stateless

  operation producing a lot of no-op results, all they have to do is

  materialize it, and Streams would automatically drop the no-ops.


Additionally, I'm +1 on not adding an opt-out at this time.


Regarding the KIP itself, I would clean it up a bit before calling for a vote.

There is a lot of "discussion"-type language there, which is very natural to

read, but makes it a bit hard to see what _exactly_ the kip is proposing.


Richard, would you mind just making the "proposed behavior change" a simple and

succinct list of bullet points? I.e., please drop glue phrases like "there has

been some discussion" or "possibly we could do X". For the final version of the

KIP, it should just say, "Streams will do X, Streams will do Y". Feel free to

add an elaboration section to explain more about what X and Y mean, but we don't

need to talk about possibilities or alternatives except in the "rejected

alternatives" section.


Accordingly, can you also move the options you presented in the intro to the

"rejected alternatives" section and only mention the final proposal itself?


This just really helps reviewers to know what they are voting for, and it helps

everyone after the fact when they are trying to get clarity on what exactly the

proposal is, versus all the things it could have been.


Thanks,

-John



On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote:

Hello to all,


I've finished making some initial modifications to the KIP.

I have decided to keep the implementation section in the KIP for

record-keeping purposes.


For now, we should focus on only the proposed behavior changes instead.


See if you have any comments!


Cheers,

Richard


On Sat, Jan 25, 2020 at 11:12 AM Richard Yu 
mailto:yohan.richard...@gmail.com>>

wrote:


Hi all,


Thanks for all the discussion!


@Jo

Re: [KAFKA-557] Add emit on change support for Kafka Streams

2020-01-31 Thread Thomas Becker
How do folks feel about allowing the mechanism by which no-ops are detected to 
be pluggable? Meaning use something like a hash by default, but you could 
optionally provide an implementation of something to use instead, like a 
ChangeDetector. This could be useful for example to ignore changes to certain 
fields, which may not be relevant to the operation being performed.

From: John Roesler 
Sent: Friday, January 31, 2020 4:51 PM
To: dev@kafka.apache.org 
Subject: Re: [KAFKA-557] Add emit on change support for Kafka Streams

[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.



Hello all,

Sorry for my silence. It seems like we are getting close to consensus.
Hopefully, we could move to a vote soon!

All of the reasoning from Matthias and Bruno around timestamp is compelling. I
would be strongly in favor of stating a few things very clearly in the KIP:
1. Streams will drop no-op updates only for KTable operations.

   That is, we won't make any changes to KStream aggregations at the moment. It
   does seem like we can potentially revisit the time semantics of that 
operation
   in the future, but we don't need to do it now.

   On the other hand, the proposed semantics for KTable timestamps (marking the
   beginning of the validity of that record) makes sense to me.

2. Streams will only drop no-op updates for _stateful_ KTable operations.

   We don't want to add a hard guarantee that Streams will _never_ emit a no-op
   table update because it would require adding state to otherwise stateless
   operations. If someone is really concerned about a particular stateless
   operation producing a lot of no-op results, all they have to do is
   materialize it, and Streams would automatically drop the no-ops.

Additionally, I'm +1 on not adding an opt-out at this time.

Regarding the KIP itself, I would clean it up a bit before calling for a vote.
There is a lot of "discussion"-type language there, which is very natural to
read, but makes it a bit hard to see what _exactly_ the kip is proposing.

Richard, would you mind just making the "proposed behavior change" a simple and
succinct list of bullet points? I.e., please drop glue phrases like "there has
been some discussion" or "possibly we could do X". For the final version of the
KIP, it should just say, "Streams will do X, Streams will do Y". Feel free to
add an elaboration section to explain more about what X and Y mean, but we don't
need to talk about possibilities or alternatives except in the "rejected
alternatives" section.

Accordingly, can you also move the options you presented in the intro to the
"rejected alternatives" section and only mention the final proposal itself?

This just really helps reviewers to know what they are voting for, and it helps
everyone after the fact when they are trying to get clarity on what exactly the
proposal is, versus all the things it could have been.

Thanks,
-John


On Mon, Jan 27, 2020, at 18:14, Richard Yu wrote:
> Hello to all,
>
> I've finished making some initial modifications to the KIP.
> I have decided to keep the implementation section in the KIP for
> record-keeping purposes.
>
> For now, we should focus on only the proposed behavior changes instead.
>
> See if you have any comments!
>
> Cheers,
> Richard
>
> On Sat, Jan 25, 2020 at 11:12 AM Richard Yu 
> wrote:
>
> > Hi all,
> >
> > Thanks for all the discussion!
> >
> > @John and @Bruno I will survey other possible systems and see what I can
> > do.
> > Just a question, by systems, I suppose you would mean the pros and cons of
> > different reporting strategies?
> >
> > I'm not completely certain on this point, so it would be great if you can
> > clarify on that.
> >
> > So here's what I got from all the discussion so far:
> >
> >- Since both Matthias and John seems to have come to a consensus on
> >this, then we will go for an all-round behavorial change for KTables. 
> > After
> >some thought, I decided that for now, an opt-out config will not be 
> > added.
> >As John have pointed out, no-op changes tend to explode further down the
> >topology as they are forwarded to more and more processor nodes 
> > downstream.
> >- About using hash codes, after some explanation from John, it looks
> >like hash codes might not be as ideal (for implementation). For now, we
> >will omit that detail, and save it for the PR.
> >- @Bruno You do have valid concerns. Though, I am not completely
> >certain if we want to do emit-on-change only for materialized KTables. I
> >will put it down in the KIP regardless.
> >
> > I will do my best to address all points raised so far on the discussion.
> > Hope we could keep this going!
> >
> > Best,
> > Richard
> >
> > On Fri, Jan 24, 2020 at 6:07 PM Bruno Cadonna  wrote:
> >
> >> Thank you Matthias for the use cases!
> >>
> >> Looking at both 

Re: [DISCUSS] KIP-566: Add rebalance callbacks to ConsumerInterceptor

2020-01-29 Thread Thomas Becker
Bumping this, hoping to get some additional feedback.

From: M. Manna 
Sent: Thursday, January 23, 2020 4:37 PM
To: dev@kafka.apache.org 
Subject: Re: [DISCUSS] KIP-566: Add rebalance callbacks to ConsumerInterceptor

[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.



Hey Thomas,

On Thu, 23 Jan 2020 at 21:17, Thomas Becker  wrote:

> Hi folks,
> I'd like to open the discussion for KIP-566: Add rebalance callbacks to
> ConsumerInterceptor. We've been looking to implement some custom metrics
> via ConsumerInterceptor, and not knowing when partition ownership changes
> is a significant impediment. I'd appreciate your thoughts.
>
>
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-566%253A%2BAdd%2Brebalance%2Bcallbacks%2Bto%2BConsumerInterceptordata=02%7C01%7CThomas.Becker%40tivo.com%7C5aa11370b535485ec09a08d7a04c7e67%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C0%7C637154122715479355sdata=xcWznGTiCv8CvtEeakFYvODghf5m7Dr1HOyZcO4RUGE%3Dreserved=0
>
>
>
>  I had a quick read through the KIP. I don't see any obvious issues.
Sounds like a simple improvement. Perhaps, you could add your thoughts
about RebalanceListener API in the future e.g. when to unify the
functionality. If implemented, we can simply use one implementation for
both things.

I would be interested to hear others' comments about this.

Thanks,


>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo is authorized to conclude any binding agreement
> on behalf of TiVo by email. Binding agreements with TiVo may only be made
> by a signed written agreement.
>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.


[DISCUSS] KIP-566: Add rebalance callbacks to ConsumerInterceptor

2020-01-23 Thread Thomas Becker
Hi folks,
I'd like to open the discussion for KIP-566: Add rebalance callbacks to 
ConsumerInterceptor. We've been looking to implement some custom metrics via 
ConsumerInterceptor, and not knowing when partition ownership changes is a 
significant impediment. I'd appreciate your thoughts.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-566%3A+Add+rebalance+callbacks+to+ConsumerInterceptor






This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.


Create KIP Permission

2020-01-23 Thread Thomas Becker
I'd like permission to create a KIP please. My confluence account is twbecker.




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.


Re: Proposal for EmbeddedKafkaCluster: Support JUnit 5 Extension in addition to JUnit 4 Rule

2019-12-09 Thread Thomas Becker
Thanks for the response John. I should point out that for streams app testing, 
we generally do use the TopologyTestDriver and it is much appreciated. We use 
EmbeddedKafkaCluster more for testing code that uses the Producer/Consumer 
clients.

Regarding using something like maven-exec, I'm just not a fan. What port do you 
run the broker on? How do the files get cleaned up after the run? How do you 
run the tests from within the IDE? What if you want to step through the broker 
code? Most of these are solvable but I guess this comes down to your opinion of 
using unit test frameworks for what are really integration-level tests. We've 
found it to work very well, the only real issue is that you generally have to 
use the same broker and client version due to the former's dependency on the 
latter, even if you don't use that broker version in production.

I also don't quite understand the idea that the API is insufficient. How is it 
less sufficient than the API you get running a "real" broker in a separate 
process?

-Tommy

On Fri, 2019-12-06 at 12:59 -0600, John Roesler wrote:

[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.





Hi Tommy,


There are some practically ancient discussions around doing this exact thing, 
but they haven't generally come to fruition because EmbeddedKafkaCluster itself 
isn't really a slam dunk, usability-wise. You'll notice that copying it (i.e., 
using it _at all_ ) is the option of last resort in my prior message.


Clearly, we use it (heavily) in the Apache Kafka project's tests, but I can 
tell you that it's kind of a source of "constant sorrow". It's not that it 
doesn't "work" (it does), but it's more that it doesn't provide the "right" 
support for higher level testing of a Streams application (as opposed to 
testing the Streams framework itself). This is why we provided 
TopologyTestDriver. To provide you with the ability to test your application 
code in an efficient and sane manner.


As it stands, supposing you really can't use TopologyTestDriver, 
EmbeddedKafkaCluster offers no practical benefits over just having a broker 
running locally for your integration tests. My thinking is, why not just do 
that? You can use the maven-exec-plugin, for example, to start and stop the 
broker automatically for your tests.


By the way, I'm not saying that we should not offer a lower-level testing 
support utlity, it's just that I don't think we should just move 
EmbeddedKafkaCluster into the public API. Particularly, for testing, we need 
something more efficient and that can be more synchronous, but still presents 
the same API as a broker. This would be a ton of work to design an build, 
though, which I assume is why no one has done it.


Thanks,

-John


On Fri, Dec 6, 2019, at 12:21, Thomas Becker wrote:

>

> Personally, I would love to see EmbeddedKafkaCluster moved to a public

> test artifact, similarly to kafka-streams-test-utils. Having to

> copy/paste internal classes into your project is...unfortunate.

>

>

> On Fri, 2019-12-06 at 10:42 -0600, John Roesler wrote:

> > [EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT 
> > CLICK any links or attachments unless you expected them. 
> > 

> >

> > Hi Matthias!

> > Thanks for the note, and the kind sentiment.

> > We're always open to improvements like this, so your contribution would 
> > certainly be welcome.

> > Just FYI, "public interface" changes need to go through the KIP process 
> > (see 
> > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKafka%2BImprovement%2BProposalsdata=02%7C01%7CThomas.Becker%40tivo.com%7C31e9dd9b7a6d4cb34cec08d77a7e7f84%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C0%7C637112556050163031sdata=ritsQphIw3P664vucl6rDupkIXh2pK%2FBUdJNo8cOVHo%3Dreserved=0
> >  ). You could of course, open an exploratory PR and ask here for comments 
> > before deciding if you want to make a KIP, if you prefer. Personally, I 
> > often find it clarifying to put together a PR concurrently with the KIP, 
> > because it helps to flush out any "devils in the details", and also can 
> > help the KIP discussion.

> > Just wanted to make you aware of the process. I'm happy to help you 
> > navigate this process, by the way.

> > Regarding the specific proposal, the number one question in my mind is 
> > compatibility... I.e., do we add new dependencies, or change dependencies, 
> > that may conflict with what's already on users' classpaths? Would the 
> > change result in any source-code or binary incompatibility with previous 
> > versions? Etc... you get the id

Re: Proposal for EmbeddedKafkaCluster: Support JUnit 5 Extension in addition to JUnit 4 Rule

2019-12-06 Thread Thomas Becker
Personally, I would love to see EmbeddedKafkaCluster moved to a public test 
artifact, similarly to kafka-streams-test-utils. Having to copy/paste internal 
classes into your project is...unfortunate.


On Fri, 2019-12-06 at 10:42 -0600, John Roesler wrote:

[EXTERNAL EMAIL] Attention: This email was sent from outside TiVo. DO NOT CLICK 
any links or attachments unless you expected them.





Hi Matthias!


Thanks for the note, and the kind sentiment.


We're always open to improvements like this, so your contribution would 
certainly be welcome.


Just FYI, "public interface" changes need to go through the KIP process (see 
https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKafka%2BImprovement%2BProposalsdata=02%7C01%7CThomas.Becker%40tivo.com%7C8c3fd9ab40da4062868008d77a6b615c%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C0%7C637112473947272691sdata=dsPBokqCBrydL8w%2FjLlj4pRfWCwmk%2B%2BQO2tTiKyoQAw%3Dreserved=0
 ). You could of course, open an exploratory PR and ask here for comments 
before deciding if you want to make a KIP, if you prefer. Personally, I often 
find it clarifying to put together a PR concurrently with the KIP, because it 
helps to flush out any "devils in the details", and also can help the KIP 
discussion.


Just wanted to make you aware of the process. I'm happy to help you navigate 
this process, by the way.


Regarding the specific proposal, the number one question in my mind is 
compatibility... I.e., do we add new dependencies, or change dependencies, that 
may conflict with what's already on users' classpaths? Would the change result 
in any source-code or binary incompatibility with previous versions? Etc... you 
get the idea.


We can make such changes, but it's a _lot_ easier to support doing it in a 
major release. Right now, that would be 3.0, which is not currently planned. 
We're currently working toward 2.5, and then 2.6, and so on, essentially 
waiting for a good reason to bump up to 3.0.


All that said, EmbeddedKafkaCluster is an interesting case, because it's not 
actually a public API!

When Bill wrote the book, he included it (I assume) because there was no other 
practical approach to testing available.

However, since the publication, we have added an official integration testing 
framework called TopologyTestDriver. When people ask me for testing advice, I 
tell them:

1. Use TopologyTestDriver (for Streams testing)

2. If you need a "real" broker for your test, then set up a pre/post 
integration test hook to run Kafka independently (e.g., with Maven).

3. If that's not practical, then _copy_ EmbeddedKafkaCluster into your own 
project, don't depend on an internal test artifact.


To me, this means that we essentially have free reign to make changes to 
EmbeddedKafkaCluster, since it _should_ only

be used for internal testing. In that case, I would just evaluate the merits up 
bumping all the tests in Apache Kafka up to JUnit 5. Although that's not a 
public API, it might be a big enough change to the process to justify a design 
document and project-wide discussion.


Well, I guess it turns out I had more to say than I initially thought... Sorry 
for rambling a bit.


What are your thoughts?

-John


On Fri, Dec 6, 2019, at 07:05, Matthias Merdes wrote:

>  Hi all,

>

> when reading ‘Kafka Streams in Action’ I especially enjoyed the

> thoughtful treatment of

> mocking, unit, and integration tests.

> Integration testing in the book (and in the Kafka codebase) is done

> using the @ClassRule-annotated EmbeddedKafkaCluster.

> JUnit 5 Jupiter replaced Rules with a different extension model.

> So my proposal would be to support a JUnit 5 Extension in addition to

> the JUnit 4 Rule for EmbeddedKafkaCluster

> to enable ’native’ integration in JUnit 5-based projects.

> Being a committer with the JUnit 5 team I would be happy to contribute

> a PR for such an extension.

> Please let me know if you are interested.

> Cheers,

> Matthias

>

>

>

>

>

>

>

>

>

>

> Matthias Merdes

> Senior Software Architect

>

>

>

> heidelpay GmbH

> Vangerowstraße 18

> 69115 Heidelberg

> -

> +49 6221 6471-692

> matthias.mer...@heidelpay.com

> --

>

> Geschäftsführer: Dipl. Vw. Mirko Hüllemann,

> Tamara Huber, André Munk, Georg Schardt

> Registergericht: AG Mannheim, HRB 337681

>


--
[cid:c1b2c53556d0d0c1f6c3c6b2eb2d9d4801163ed8.camel@tivo.com] Tommy Becker
Principal Engineer
Personalized Content Discovery
O +1 919.460.4747
tivo.com



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If 

Re: [VOTE] KIP-476: Add Java AdminClient interface

2019-06-07 Thread Thomas Becker
+1 non-binding

We've run into issues trying to decorate the AdminClient due it being an 
abstract class. Will be nice to have consistency with Producer/Consumer as well.

On Tue, 2019-06-04 at 17:17 +0100, Andy Coates wrote:

Hi folks


As there's been no chatter on this KIP I'm assuming it's non-contentious,

(or just boring), hence I'd like to call a vote for KIP-476:


https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-476%253A%2BAdd%2BJava%2BAdminClient%2BInterfacedata=02%7C01%7CThomas.Becker%40tivo.com%7Ccf5880cd9d684d0489db08d6e9082607%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C0%7C636952618549124417sdata=pC75MuDwA%2FzBeiGtIPwJVIjclchXO9Q9g5Uz4mTTjZY%3Dreserved=0


Thanks,


Andy


--
[cid:022b6797e938d93974764f5b6df045112e31ce5e.camel@tivo.com] Tommy Becker
Principal Engineer
Personalized Content Discovery
O +1 919.460.4747
tivo.com



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
is authorized to conclude any binding agreement on behalf of TiVo by email. 
Binding agreements with TiVo may only be made by a signed written agreement.


Re: [VOTE] KIP-349 Priorities for Source Topics

2019-01-24 Thread Thomas Becker
Yes, I think this type of strategy interface would be valuable.

On Wed, 2019-01-16 at 15:41 +, Jan Filipiak wrote:


On 16.01.2019 14:05, Thomas Becker wrote:

I'm going to bow out of this discussion since it's been made clear that

the feature is not targeted at streams. But for the record, my desire is

to have an alternative to the timestamp based message choosing strategy

streams currently imposes, and I thought topic prioritization in the

consumer could potentially enable that. See

https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-4113data=02%7C01%7CThomas.Becker%40tivo.com%7C3b9d3e621aa34f3407ca08d67bc925ba%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C0%7C636832501188529765sdata=MigH1qL0irqfOWk8K3yN3FBDJlmxXSPxq4HhcdAA3lQ%3Dreserved=0


-Tommy



Would you be so kind to leave an impression about a MessageChooser

interface? Might be important for an extra KIP later


Best Jan


--
[cid:21b269bfb8fb69852562f383cafc57a80e6a5ddc.camel@tivo.com] Tommy Becker
Principal Engineer
Personalized Content Discovery
O +1 919.460.4747
tivo.com<http://www.tivo.com/>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [VOTE] KIP-349 Priorities for Source Topics

2019-01-16 Thread Thomas Becker
I'm going to bow out of this discussion since it's been made clear that the 
feature is not targeted at streams. But for the record, my desire is to have an 
alternative to the timestamp based message choosing strategy streams currently 
imposes, and I thought topic prioritization in the consumer could potentially 
enable that. See https://issues.apache.org/jira/browse/KAFKA-4113

-Tommy

On Mon, 2019-01-14 at 19:19 -0500, n...@afshartous.com wrote:

Hi Jan,


As discussed, I’ve adopted the position that MessageChooser is orthogonal to 
topic prioritization and hence outside the scope of KIP-349.

--

  Nick



On Jan 14, 2019, at 12:47 AM, Jan Filipiak 
mailto:jan.filip...@trivago.com>> wrote:


On 14.01.2019 02:48, n...@afshartous.com 
> wrote:



On reflection, it would be hard to describe the semantics of an API that tried 
to address starvation by temporarily disabling prioritization, and then 
oscillating back and forth.

Thus I agree that it makes sense not to try and address starvation to Mathias’ 
point that this is intended by design.  The KIP has been updated to reflect 
this by removing the second method.



The semantics of almost everything are hard to describe with only those

two tools at hand. Just here to remember yall that Samza already shows

us the interface of a powerful enough abstraction to get stuff done :)


https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsamza.apache.org%2Flearn%2Fdocumentation%2F0.12%2Fapi%2Fjavadocs%2Forg%2Fapache%2Fsamza%2Fsystem%2Fchooser%2FMessageChooser.htmldata=02%7C01%7CThomas.Becker%40tivo.com%7C442806cbda9d49b2495b08d67a7f1886%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C0%7C636831083628567804sdata=KQSr42ldcMoR7xYUgJUOL6U%2FzsQyMGrE7xXXuscQVXA%3Dreserved=0
 



welcome :)






--
[cid:b273dd8104d798cae8911c5c326b64f16294580e.camel@tivo.com] Tommy Becker
Principal Engineer
Personalized Content Discovery
O +1 919.460.4747
tivo.com



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [VOTE] KIP-349 Priorities for Source Topics

2018-10-08 Thread Thomas Becker
Well my (perhaps flawed) understanding of topic priorities is that lower 
priority topics are not consumed as long as higher priority ones have 
unconsumed messages (which means our position < HW). So if I'm doing this 
manually, I have to make some determination as to whether my high priority 
topic partitions are at the HW before I can decide if I want to poll the lower 
priority ones. Right?

On Fri, 2018-10-05 at 11:34 -0700, Colin McCabe wrote:

On Fri, Oct 5, 2018, at 10:58, Thomas Becker wrote:

Colin,

Would you mind sharing your vision for how this looks with multiple

consumers? I'm still getting my bearings with the new consumer but it's

not immediately obvious to me how this would work.


Hi Thomas,


I was just responding to the general idea that you would have some kind of 
control topic that you wanted to read with very low latency, and some kind of 
set of data topics where the latency requirements are less strict.  In that 
case, you can just have two consumers: one for the low-latency topic, and one 
for the less low-latency topics.


There's a lot of things in this picture that are unclear.  Does the data in one 
set of topics have any relation to the data in the other?  Why do we want a 
control channel distinct from the data channel?  That's why I asked for 
clarification on the use-case.


In particular, it doesn't seem particularly easy to know when you are at the 
high

watermark of a topic.


KafkaConsumer#committed will return the last committed offset for a partition.  
However, I'm not sure I understand why you want this information in this case-- 
can you expand a bit on this?


best,

Colin




-Tommy


On Mon, 2018-10-01 at 13:43 -0700, Colin McCabe wrote:


Hi all,



I feel like the DISCUSS thread didn't really come to a conclusion, so a

vote would be premature here.



In particular, I still don't really understand the use-case for this

feature.  Can someone give a concrete scenario where you would need

this?  The control plane / data plane example that is listed in the KIP

doesn't require this feature.  You can just have one consumer for the

control plane, and one for the data plane, and do priority that way.

The discussion feels kind of unfocused since we haven't identified even

one concrete use-case that needs this feature.



Unfortunately, this is a feature which consumes server-side memory.  We

have to store the priorities somehow when doing incremental fetch

requests.  If we go with an int as suggested, then this is at least 4

bytes per partition per incremental fetch request.  It also makes it

more complex and potentially slower to maintain the linked list of

partitions in the fetch requests.  Before we think about this, I'd like

to have a concrete use-case in mind, so that we can evaluate the costs

versus benefits.



best,


Colin




On Mon, Oct 1, 2018, at 07:47, Dongjin Lee wrote:


Great. +1 (non-binding)



On Mon, Oct 1, 2018 at 4:23 AM Matthias J. Sax

mailto:matth...@confluent.io><mailto:matth...@confluent.io<mailto:matth...@confluent.io>>>


wrote:



+1 (binding)



As Dongjin pointed out, the community is working on upcoming 2.1


release, and thus it might take some time until people find time to


follow up on this an vote.




-Matthias



On 9/30/18 11:11 AM, 
n...@afshartous.com<mailto:n...@afshartous.com><mailto:n...@afshartous.com<mailto:n...@afshartous.com>>
 wrote:



On Sep 30, 2018, at 5:16 AM, Dongjin Lee

mailto:dong...@apache.org><mailto:dong...@apache.org<mailto:dong...@apache.org>>>
 wrote:



1. Your KIP document


<


https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics


<


https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics



lacks hyperlink to the discussion thread. And I couldn`t find the


discussion thread from the mailing archive.




Hi Dongjin,



There has been a discussion thread.  I added this link as a reference



  https://lists.apache.org/list.html?dev@kafka.apache.org:lte=1M:kip-349


<https://lists.apache.org/list.html?dev@kafka.apache.org:lte=1M:kip-349>



to the KIP-349 page




https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics


<


https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics




Best,


--


  Nick







--


*Dongjin Lee*



*A hitchhiker in the mathematical world.*



*github:  <http://goog_969573159/>github.com/dongjinleekr


<http://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr


<http://kr.linkedin.com/in/dongjinleekr>slideshare:


www.slideshare.net/dongjinleekr<http://www.slideshare.net/dongjinleekr<http://www.slideshare.net/dongjinleekr<http://www.slideshare.net/dongjinleekr>>


<http://www.slideshare.net/dongjinleekr>*





This email and any attachments may

Re: [VOTE] KIP-349 Priorities for Source Topics

2018-10-05 Thread Thomas Becker
Colin,
Would you mind sharing your vision for how this looks with multiple consumers? 
I'm still getting my bearings with the new consumer but it's not immediately 
obvious to me how this would work. In particular, it doesn't seem particularly 
easy to know when you are at the high watermark of a topic.

-Tommy

On Mon, 2018-10-01 at 13:43 -0700, Colin McCabe wrote:

Hi all,


I feel like the DISCUSS thread didn't really come to a conclusion, so a vote 
would be premature here.


In particular, I still don't really understand the use-case for this feature.  
Can someone give a concrete scenario where you would need this?  The control 
plane / data plane example that is listed in the KIP doesn't require this 
feature.  You can just have one consumer for the control plane, and one for the 
data plane, and do priority that way.  The discussion feels kind of unfocused 
since we haven't identified even one concrete use-case that needs this feature.


Unfortunately, this is a feature which consumes server-side memory.  We have to 
store the priorities somehow when doing incremental fetch requests.  If we go 
with an int as suggested, then this is at least 4 bytes per partition per 
incremental fetch request.  It also makes it more complex and potentially 
slower to maintain the linked list of partitions in the fetch requests.  Before 
we think about this, I'd like to have a concrete use-case in mind, so that we 
can evaluate the costs versus benefits.


best,

Colin



On Mon, Oct 1, 2018, at 07:47, Dongjin Lee wrote:

Great. +1 (non-binding)


On Mon, Oct 1, 2018 at 4:23 AM Matthias J. Sax 
mailto:matth...@confluent.io>>

wrote:


+1 (binding)


As Dongjin pointed out, the community is working on upcoming 2.1

release, and thus it might take some time until people find time to

follow up on this an vote.



-Matthias


On 9/30/18 11:11 AM, n...@afshartous.com wrote:


On Sep 30, 2018, at 5:16 AM, Dongjin Lee 
mailto:dong...@apache.org>> wrote:


1. Your KIP document

<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics

<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics


lacks hyperlink to the discussion thread. And I couldn`t find the

discussion thread from the mailing archive.



Hi Dongjin,


There has been a discussion thread.  I added this link as a reference


  https://lists.apache.org/list.html?dev@kafka.apache.org:lte=1M:kip-349




to the KIP-349 page



https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics

<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics



Best,

--

  Nick






--

*Dongjin Lee*


*A hitchhiker in the mathematical world.*


*github:  github.com/dongjinleekr

linkedin: kr.linkedin.com/in/dongjinleekr

slideshare:

www.slideshare.net/dongjinleekr

*



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [DISCUSS] KIP-349 Priorities for Source Topics

2018-09-17 Thread Thomas Becker
To sum up here, I don't disagree that timestamp semantics are nice and often 
useful. But currently, there is no way to opt-out of these semantics. In our 
case the timestamp of an updated item record, which say, provides a better or 
corrected description, is simply preferred over the old record, period. Trying 
to force a temporal relationship between that and an event where the item was 
viewed is non-sensical.


On Mon, 2018-09-17 at 18:18 +, Thomas Becker wrote:

Hi Matthias,

I'm familiar with how the timestamp synchronization currently works. I also 
submit that it does not work for our use-case, which is the following: The 
table-backing topic contains records with the best available data we have for a 
given item. IF a record in this topic is updated, we would always prefer to 
join using this data *regardless* of whether it is "newer" than the incoming 
event we are trying to join it with.


Essentially, streams assumes that we must want the table data that was current 
at the time the event was produced, and here we simply don't. If we have newer 
data, we want that. But my larger concern here is actually reprocessing; when 
doing that the older table-data will be log compacted away and the current 
timestamp semantics will result in events that occurred prior to the latest 
table updates being unjoined. Does this make sense now?


Thanks!

Tommy


On Mon, 2018-09-17 at 09:51 -0700, Matthias J. Sax wrote:


I am not sure if this feature would help with stream-table joins. Also


note, that we recently merged a PR that improves the timestamp


synchronization of Kafka Streams -- this will vastly improve the guarantees.



What I don't understand:



So table records that have been updated recently will not be read until the 
stream records reach or exceed that same timestamp.



Yes, this is on purpose / by design.



and if they do it will be with old data



What do you mean by "old data"? By definition, the stream record will


join with a table that contains data up-to the stream record's


timestamp. It does semantically not make sense to advance the table


beyond the stream record's timestamp, because if you do this, you would


semantically join with "future data" what---from my point of view---is


semantically incorrect.



Shameless plug: you might want to read


https://www.confluent.io/blog/streams-tables-two-sides-same-coin





-Matthias



On 9/17/18 8:23 AM, Thomas Becker wrote:


For my part, a major use-case for this feature is stream-table joins. 
Currently, KafkaStreams does the wrong thing in some cases because the only 
message choosing strategy available is timestamp-based. So table records that 
have been updated recently will not be read until the stream records reach or 
exceed that same timestamp. So there is no guarantee these records get joined 
at all, and if they do it will be with old data. I realize we're talking about 
the consumer here and not streams specifically, but as it stands I can't even 
write a non-streams application that does a join but prioritizes table-topic 
records over stream records without using multiple consumers.



On Wed, 2018-09-05 at 08:18 -0700, Colin McCabe wrote:



Hi all,




I agree that DISCUSS is more appropriate than VOTE at this point, since I don't 
remember the last discussion coming to a definite conclusion.




I guess my concern is that this will add complexity and memory consumption on 
the server side.  In the case of incremental fetch requests, we will have to 
track at least two extra bytes per partition, to know what the priority of each 
partition is within each active fetch session.




It would be nice to hear more about the use-cases for this feature.  I think 
Gwen asked about this earlier, and I don't remember reading a response.  The 
fact that we're now talking about Samza interfaces is a bit of a red flag.  
After all, Samza didn't need partition priorities to do what it did.  You can 
do a lot with muting partitions and using appropriate threading in your code.




For example, you can hand data from a partition off to a work queue with a 
fixed size, which is handled by a separate service thread.  If the queue gets 
full, you can mute the partition until some of the buffered data is processed.  
Kafka Streams uses a similar approach to avoid reading partition data that 
isn't immediately needed.




There might be some use-cases that need priorities eventually, but I'm 
concerned that we're jumping the gun by trying to implement this before we know 
what they are.




best,



Colin





On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote:




On 05.09.2018 02:38, 
n...@afshartous.com<mailto:n...@afshartous.com><mailto:n...@afshartous.com<mailto:n...@afshartous.com>><mailto:n...@afshartous.com<mailto:n...@afshartous.com><mailto:n...@afshartous.com<mailto:n...@afshartous.com>>>
 wrote:




On Sep 4, 2018, at 4:20 PM, Jan Fi

Re: [DISCUSS] KIP-349 Priorities for Source Topics

2018-09-17 Thread Thomas Becker
Hi Matthias,
I'm familiar with how the timestamp synchronization currently works. I also 
submit that it does not work for our use-case, which is the following: The 
table-backing topic contains records with the best available data we have for a 
given item. IF a record in this topic is updated, we would always prefer to 
join using this data *regardless* of whether it is "newer" than the incoming 
event we are trying to join it with.

Essentially, streams assumes that we must want the table data that was current 
at the time the event was produced, and here we simply don't. If we have newer 
data, we want that. But my larger concern here is actually reprocessing; when 
doing that the older table-data will be log compacted away and the current 
timestamp semantics will result in events that occurred prior to the latest 
table updates being unjoined. Does this make sense now?

Thanks!
Tommy

On Mon, 2018-09-17 at 09:51 -0700, Matthias J. Sax wrote:

I am not sure if this feature would help with stream-table joins. Also

note, that we recently merged a PR that improves the timestamp

synchronization of Kafka Streams -- this will vastly improve the guarantees.


What I don't understand:


So table records that have been updated recently will not be read until the 
stream records reach or exceed that same timestamp.


Yes, this is on purpose / by design.


and if they do it will be with old data


What do you mean by "old data"? By definition, the stream record will

join with a table that contains data up-to the stream record's

timestamp. It does semantically not make sense to advance the table

beyond the stream record's timestamp, because if you do this, you would

semantically join with "future data" what---from my point of view---is

semantically incorrect.


Shameless plug: you might want to read

https://www.confluent.io/blog/streams-tables-two-sides-same-coin




-Matthias


On 9/17/18 8:23 AM, Thomas Becker wrote:

For my part, a major use-case for this feature is stream-table joins. 
Currently, KafkaStreams does the wrong thing in some cases because the only 
message choosing strategy available is timestamp-based. So table records that 
have been updated recently will not be read until the stream records reach or 
exceed that same timestamp. So there is no guarantee these records get joined 
at all, and if they do it will be with old data. I realize we're talking about 
the consumer here and not streams specifically, but as it stands I can't even 
write a non-streams application that does a join but prioritizes table-topic 
records over stream records without using multiple consumers.


On Wed, 2018-09-05 at 08:18 -0700, Colin McCabe wrote:


Hi all,



I agree that DISCUSS is more appropriate than VOTE at this point, since I don't 
remember the last discussion coming to a definite conclusion.



I guess my concern is that this will add complexity and memory consumption on 
the server side.  In the case of incremental fetch requests, we will have to 
track at least two extra bytes per partition, to know what the priority of each 
partition is within each active fetch session.



It would be nice to hear more about the use-cases for this feature.  I think 
Gwen asked about this earlier, and I don't remember reading a response.  The 
fact that we're now talking about Samza interfaces is a bit of a red flag.  
After all, Samza didn't need partition priorities to do what it did.  You can 
do a lot with muting partitions and using appropriate threading in your code.



For example, you can hand data from a partition off to a work queue with a 
fixed size, which is handled by a separate service thread.  If the queue gets 
full, you can mute the partition until some of the buffered data is processed.  
Kafka Streams uses a similar approach to avoid reading partition data that 
isn't immediately needed.



There might be some use-cases that need priorities eventually, but I'm 
concerned that we're jumping the gun by trying to implement this before we know 
what they are.



best,


Colin




On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote:



On 05.09.2018 02:38, 
n...@afshartous.com<mailto:n...@afshartous.com><mailto:n...@afshartous.com<mailto:n...@afshartous.com>>
 wrote:



On Sep 4, 2018, at 4:20 PM, Jan Filipiak 
mailto:jan.filip...@trivago.com><mailto:jan.filip...@trivago.com<mailto:jan.filip...@trivago.com>>>
 wrote:



what I meant is litterally this interface:



https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html
 
<https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html>


Hi Jan,



Thanks for the reply and I have a few questions.  This Samza doc



   https://samza.apache.org/learn/documentation/0.14/container/streams.html 
<https://samza.apache.org/learn/documentation/0.14/container/str

Re: [DISCUSS] KIP-349 Priorities for Source Topics

2018-09-17 Thread Thomas Becker
For my part, a major use-case for this feature is stream-table joins. 
Currently, KafkaStreams does the wrong thing in some cases because the only 
message choosing strategy available is timestamp-based. So table records that 
have been updated recently will not be read until the stream records reach or 
exceed that same timestamp. So there is no guarantee these records get joined 
at all, and if they do it will be with old data. I realize we're talking about 
the consumer here and not streams specifically, but as it stands I can't even 
write a non-streams application that does a join but prioritizes table-topic 
records over stream records without using multiple consumers.

On Wed, 2018-09-05 at 08:18 -0700, Colin McCabe wrote:

Hi all,


I agree that DISCUSS is more appropriate than VOTE at this point, since I don't 
remember the last discussion coming to a definite conclusion.


I guess my concern is that this will add complexity and memory consumption on 
the server side.  In the case of incremental fetch requests, we will have to 
track at least two extra bytes per partition, to know what the priority of each 
partition is within each active fetch session.


It would be nice to hear more about the use-cases for this feature.  I think 
Gwen asked about this earlier, and I don't remember reading a response.  The 
fact that we're now talking about Samza interfaces is a bit of a red flag.  
After all, Samza didn't need partition priorities to do what it did.  You can 
do a lot with muting partitions and using appropriate threading in your code.


For example, you can hand data from a partition off to a work queue with a 
fixed size, which is handled by a separate service thread.  If the queue gets 
full, you can mute the partition until some of the buffered data is processed.  
Kafka Streams uses a similar approach to avoid reading partition data that 
isn't immediately needed.


There might be some use-cases that need priorities eventually, but I'm 
concerned that we're jumping the gun by trying to implement this before we know 
what they are.


best,

Colin



On Wed, Sep 5, 2018, at 01:06, Jan Filipiak wrote:


On 05.09.2018 02:38, n...@afshartous.com wrote:


On Sep 4, 2018, at 4:20 PM, Jan Filipiak 
mailto:jan.filip...@trivago.com>> wrote:


what I meant is litterally this interface:


https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html
 


Hi Jan,


Thanks for the reply and I have a few questions.  This Samza doc


   https://samza.apache.org/learn/documentation/0.14/container/streams.html 



indicates that the chooser is set via configuration.  Are you suggesting adding 
a new configuration for Kafka ?  Seems like we could also have a method on 
KafkaConsumer


 public void register(MessageChooser messageChooser)

I don't have strong opinions regarding this. I like configs, i also

don't think it would be a problem to have both.



to make it more dynamic.


Also, the Samza MessageChooser interface has method


   /* Notify the chooser that a new envelope is available for a processing. */

void update(IncomingMessageEnvelope envelope)


and I’m wondering how this method would be translated to Kafka API.  In 
particular what corresponds to IncomingMessageEnvelope.

I think Samza uses the envelop abstraction as they support other sources

besides kafka aswell. They are more

on the spark end of things when it comes to different input types. I

don't have strong opinions but it feels like

we wouldn't need such a thing in the kafka consumer but just use a

regular ConsumerRecord or so.


Best,

--

   Nick








This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [VOTE] KIP-349 Priorities for Source Topics

2018-08-20 Thread Thomas Becker
I agree with Jan. A strategy interface for choosing processing order is nice, 
and would hopefully be a step towards getting this in streams.

-Tommy

On Mon, 2018-08-20 at 12:52 +0200, Jan Filipiak wrote:

On 20.08.2018 00:19, Matthias J. Sax wrote:

@Nick: A KIP is only accepted if it got 3 binding votes, ie, votes from

committers. If you close the vote before that, the KIP would not be

accepted. Note that committers need to pay attention to a lot of KIPs

and it can take a while until people can look into it. Thanks for your

understanding.


@Jan: Can you give a little bit more context on your concerns? It's

unclear why you mean atm.

Just saying that we should peek at the Samza approach, it's a much more

powerful abstraction. We can ship a default MessageChooser

that looks at the topics priority.

@Adam: anyone can vote :)




-Matthias


On 8/19/18 9:58 AM, Adam Bellemare wrote:

While I am not sure if I can or can’t vote, my question re: Jan’s comment is, 
“should we be implementing it as Samza does?”


I am not familiar with the drawbacks of the current approach vs how samza does 
it.


On Aug 18, 2018, at 5:06 PM, n...@afshartous.com 
wrote:



I only saw one vote on KIP-349, just checking to see if anyone else would like 
to vote before closing this out.

--

  Nick



On Aug 13, 2018, at 9:19 PM, n...@afshartous.com 
wrote:



Hi All,


Calling for a vote on KIP-349


https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics


--

 Nick











This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [DISCUSS] KIP-353: Allow Users to Configure Kafka Streams Timestamp Synchronization

2018-08-07 Thread Thomas Becker
In typing up a scenario to illustrate my question, I think I found the answer 
;) We are not assuming timestamps will be strictly increasing within a topic 
and trying to make processing order deterministic even in the face of that. 
Thanks for making me think about it (or please correct me if I'm wrong).


On Tue, 2018-08-07 at 10:48 -0700, Matthias J. Sax wrote:

@Thomas, just to rephrase (from my understanding):


So in the scenario you describe, where one topic has

vastly lower throughput, you're saying that when the lower throughput topic

is fully caught up (no messages in the buffer), the task will idle rather

than using the timestamp of the last message it saw from that topic?


The idea of the KIP is to configure a max blocking time for tasks. This

allows to provide a 'grace period such that new data for the empty

partition can we received (if an upstream producer still writes data,

but with low throughput). After the blocking timeout expires (in case

throughput is too small or upstream producer died), the task will

process data even if there are empty input topic partitions (because we

cannot block forever in a stream processing scenario).


Not sure, what you mean by "using the timestamp of the last message"?

Using for what? The KIP is about processing records of partitions that

do have data if some other partitions are empty (and to allow users to

configure a max "blocking time" until processing is "forced").



-Matthias



On 8/7/18 10:02 AM, Guozhang Wang wrote:

@Tommy


Yes that's the intent. Again note that the current behavior is indeed "just

using the timestamp of the last message I saw", and continue processing

what's in the buffer from other streams, but this may introduce

out-of-ordering.



Guozhang



On Tue, Aug 7, 2018 at 9:59 AM, Thomas Becker 
mailto:thomas.bec...@tivo.com>>

wrote:


Thanks Guozhang. So in the scenario you describe, where one topic has

vastly lower throughput, you're saying that when the lower throughput topic

is fully caught up (no messages in the buffer), the task will idle rather

than using the timestamp of the last message it saw from that topic?

Initially I was under the impression that this would only happen when the

task had not yet seen any messages from one of the partitions.


Regarding choosing, you are exactly right. This mechanism is pluggable in

Samza, and I'd like to see something similar done in Kafka Streams. The

timestamp based choosing policy is great and makes sense in a lot of

scenarios, but having something like a priority based policy would be very

nice for some of our usecases.


-Tommy


On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote:


@Ted



Yes, I will update the KIP mentioning this as a separate consideration.




@Thomas



The idle period may be happening during the processing as well. Think: if


you are joining two streams with very different throughput traffic, say for


an extreme case, one stream comes in as 100K messages / sec, another comes


in as 1 message / sec. Then it could happen from time to time that we have


reached the tail of the low-traffic stream and do not have any data


received yet from that stream, while the other stream still have


unprocessed buffered data. Currently we will always go ahead and just


process the other stream's buffered data, but bare in mind that when we


eventually have received the data from the low-traffic stream we realized


that its timestamp is even smaller than what we already have processed, and


hence accidentally introduced out-of-ordering data.



What you described in stream-table joins is also a common case (e.g.


https://issues.apache.org/jira/browse/KAFKA-4113). Personally I think it

is


more related to a general theme, of "messaging choosing", in which Kafka


Streams today only allows timestamp-synchronization-based message

choosing.


Other mechanisms are requested as well, e.g. topic-priority based messaging


choosing, or type based (e.g. always prefer KTable over KStream). And this


is not only for Streams, but also for generally Consumer itself: e.g. Nick


has recently proposed this (


https://cwiki.apache.org/confluence/display/KAFKA/KIP-

349%3A+Priorities+for+Source+Topics).


I think this topic itself should be discussed as a separate KIP, maybe for


both Streams and Consumer clients, and hence I intentionally avoid


overlapping with it and stays with a static messaging choosing mechanism in


my KIP.





Guozhang




On Tue, Aug 7, 2018 at 4:55 AM, Thomas Becker 
mailto:thomas.bec...@tivo.com><

mailto:thomas.bec...@tivo.com<mailto:thomas.bec...@tivo.com>>>


wrote:



This looks like a big step in the right direction IMO. So am I correct in


assuming this idle period would only come into play after startup when


waiting for initial records to be fetched? In other words, once we have


seen records from all topics and have established 

Re: [DISCUSS] KIP-353: Allow Users to Configure Kafka Streams Timestamp Synchronization

2018-08-07 Thread Thomas Becker
Thanks Guozhang. So in the scenario you describe, where one topic has vastly 
lower throughput, you're saying that when the lower throughput topic is fully 
caught up (no messages in the buffer), the task will idle rather than using the 
timestamp of the last message it saw from that topic? Initially I was under the 
impression that this would only happen when the task had not yet seen any 
messages from one of the partitions.

Regarding choosing, you are exactly right. This mechanism is pluggable in 
Samza, and I'd like to see something similar done in Kafka Streams. The 
timestamp based choosing policy is great and makes sense in a lot of scenarios, 
but having something like a priority based policy would be very nice for some 
of our usecases.

-Tommy

On Tue, 2018-08-07 at 09:30 -0700, Guozhang Wang wrote:

@Ted


Yes, I will update the KIP mentioning this as a separate consideration.



@Thomas


The idle period may be happening during the processing as well. Think: if

you are joining two streams with very different throughput traffic, say for

an extreme case, one stream comes in as 100K messages / sec, another comes

in as 1 message / sec. Then it could happen from time to time that we have

reached the tail of the low-traffic stream and do not have any data

received yet from that stream, while the other stream still have

unprocessed buffered data. Currently we will always go ahead and just

process the other stream's buffered data, but bare in mind that when we

eventually have received the data from the low-traffic stream we realized

that its timestamp is even smaller than what we already have processed, and

hence accidentally introduced out-of-ordering data.


What you described in stream-table joins is also a common case (e.g.

https://issues.apache.org/jira/browse/KAFKA-4113). Personally I think it is

more related to a general theme, of "messaging choosing", in which Kafka

Streams today only allows timestamp-synchronization-based message choosing.

Other mechanisms are requested as well, e.g. topic-priority based messaging

choosing, or type based (e.g. always prefer KTable over KStream). And this

is not only for Streams, but also for generally Consumer itself: e.g. Nick

has recently proposed this (

https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics).

I think this topic itself should be discussed as a separate KIP, maybe for

both Streams and Consumer clients, and hence I intentionally avoid

overlapping with it and stays with a static messaging choosing mechanism in

my KIP.




Guozhang



On Tue, Aug 7, 2018 at 4:55 AM, Thomas Becker 
mailto:thomas.bec...@tivo.com>>

wrote:


This looks like a big step in the right direction IMO. So am I correct in

assuming this idle period would only come into play after startup when

waiting for initial records to be fetched? In other words, once we have

seen records from all topics and have established the stream time

processing will not go idle again right?


I still feel that timestamp semantics are not wanted in all cases.

Consider a simple stream-table join to augment incoming events where the

table data has been updated recently (and hence has a later timestamp than

some incoming events). Currently this will not be joined at all (assuming

older table records have been compacted) until the timestamps on events

start passing the table updates. For use cases like this I'd like to be

able to say always prefer processing the table backing topic if it has data

available, regardless of timestamp.


On Fri, 2018-08-03 at 14:00 -0700, Guozhang Wang wrote:


Hello all,



I would like to kick off a discussion on the following KIP, to allow users


control when a task can be processed based on its buffered records, and how


the stream time of a task be advanced.



https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%

3A+Improve+Kafka+Streams+Timestamp+Synchronization



This is related to one of the root causes of out-of-ordering data in Kafka


Streams. Any thoughts / comments on this topic is more than welcomed.




Thanks,


-- Guozhang






This email and any attachments may contain confidential and privileged

material for the sole use of the intended recipient. Any review, copying,

or distribution of this email (or any attachments) by others is prohibited.

If you are not the intended recipient, please contact the sender

immediately and permanently delete this email and any attachments. No

employee or agent of TiVo Inc. is authorized to conclude any binding

agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo

Inc. may only be made by a signed written agreement.








This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is pr

Re: [VOTE] KIP-346 - Improve LogCleaner behavior on error

2018-08-07 Thread Thomas Becker
+1 (non-binding)

We've hit issues with the log cleaner in the past, and this would be a great 
improvement.
On Tue, 2018-08-07 at 12:19 +0100, Stanislav Kozlovski wrote:

Hey everybody,

I'm starting a vote on KIP-346







This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [DISCUSS] KIP-353: Allow Users to Configure Kafka Streams Timestamp Synchronization

2018-08-07 Thread Thomas Becker
This looks like a big step in the right direction IMO. So am I correct in 
assuming this idle period would only come into play after startup when waiting 
for initial records to be fetched? In other words, once we have seen records 
from all topics and have established the stream time processing will not go 
idle again right?

I still feel that timestamp semantics are not wanted in all cases. Consider a 
simple stream-table join to augment incoming events where the table data has 
been updated recently (and hence has a later timestamp than some incoming 
events). Currently this will not be joined at all (assuming older table records 
have been compacted) until the timestamps on events start passing the table 
updates. For use cases like this I'd like to be able to say always prefer 
processing the table backing topic if it has data available, regardless of 
timestamp.

On Fri, 2018-08-03 at 14:00 -0700, Guozhang Wang wrote:

Hello all,


I would like to kick off a discussion on the following KIP, to allow users

control when a task can be processed based on its buffered records, and how

the stream time of a task be advanced.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization


This is related to one of the root causes of out-of-ordering data in Kafka

Streams. Any thoughts / comments on this topic is more than welcomed.



Thanks,

-- Guozhang




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Builder Pattern for kafka-clients in 2.x ?

2018-07-05 Thread Thomas Becker
Personally, I like the idea of builders for the producer/consumer themselves, 
but I'm less enthusiastic about one for ProducerRecord. Mostly because I think 
the following is overly verbose/reads poorly:

producer.send(ProducerRecord.builder()
.topic("mytopic")
.key("Key")
.value("the-val")
.headers(myHeaderIterable)
.build());

as compared to:

producer.send(new ProducerRecord("mytopic", "Key", "the-val", 
myHeaderIterable));

I think constructor overloads are fine for small data classes like this. The 
producer/consumer clietns themselves have a lot of options represented by 
various configuration keys, and a builder pattern makes these easily 
discoverable in code.

-Tommy

On Wed, 2018-07-04 at 15:42 +0200, Matthias Wessendorf wrote:

Hi,


I was filing KAFKA-7059 ([1]) and sent a PR adding a new ctor:

--

public ProducerRecord(String topic, K key, V value, Iterable

headers)

---


One reasonable comment on the PR was instead of doing constructor

overloading, why not working on a builder for the ProducerRecord class.


I think this is generally a nice idea I was wondering if there is much

interest in ?


Sample:

---

final ProducerRecord myRecord = ProducerRecord.builder() //

or an exposed builder

.topic("mytopic")

.key("Key")

.value("the-val")

.headers(myHeaderIterable)

.build();

---


While at it - instead of just offering a builder for the "ProducerRecord"

class, why not adding a builder for the "KafkaProducer" and "KafkaConsumer"

clazzes.


---

final KafkaProducer myProducer = KafkaProducer.builder() //

or an exposed builder clazz

.config(myProducerConfig)

.keySerializer(myStringSerializer)

.valueSerializer(myStringSerializer)

.build();

---


to even make the above more nice, I think the "ProducerConfig" (analog the

ConsumerConfig) configuration options could be also made accesible w/ this

fluent API - instead of properties/map, which is what now dominates the

creation of the Consumers/Producers.



Any thoughts?   If there is interest, I am happy to start a KIP w/ a first

draft of the suggested API!


Cheers,

Matthias


[1] https://issues.apache.org/jira/browse/KAFKA-7059







This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2017-11-06 Thread Thomas Becker
Could you clarify exactly what you mean by keeping the current distinction?

Actually, re-reading the KIP and JIRA, it's not clear that being able to 
specify a custom name is actually a requirement. If the goal is to control 
repartitioning and tune parallelism, maybe we can just sidestep this issue 
altogether by removing the ability to set a different name.

On Mon, 2017-11-06 at 16:51 +0100, Matthias J. Sax wrote:

That's a good point. In current design, we strictly distinguish both.
For example, the reset tools deletes internal topics (starting with
prefix `-` and ending with either `-repartition` or
`-changelog`.

Thus, from my point of view, it would make sense to keep the current
distinction.

-Matthias

On 11/6/17 4:45 PM, Thomas Becker wrote:


I think this sounds good as well. It's worth clarifying whether topics that are 
named by the user but created by streams are considered "internal" topics also.

On Sun, 2017-11-05 at 23:02 +0100, Matthias J. Sax wrote:

My idea was, to relax the requirement for through() that a topic must be
created manually before startup.

Thus, if no through() call is made, a (internal) topic is created the
same way we do it currently.

If one uses `through(String topicName)` we keep the current behavior and
require users to create the topic manually.

The reasoning is as follows: if a user creates a topic manually, a user
can just use it for repartitioning. As the topic is already there, there
is no need to specify any topic configs.

We add a new `through()` overload (details TBD) that allows to specify
topic configs and Streams create the topic with those configs.

Reasoning: user don't want to manage topic manually, thus, it's still an
internal topic and Streams create the topic name automatically as for
all other internal topics. However, users gets some more control about
topic parameters like number of partitions (we should discuss what other
configs would be useful).


Does this make sense?


-Matthias


On 11/5/17 1:21 AM, Jan Filipiak wrote:


Hi.


Im not 100 % up to date what version 1.0 DSL looks like ATM.
I just would argue that repartitioning should be an own API call like
through or something.
One can use through or to already to get this. I would argue one should
look there instead of overloads

Best Jan

On 04.11.2017 16:01, Jeyhun Karimov wrote:


Dear community,

I would like to initiate discussion on KIP-221 [1] based on issue [2].
Please feel free to comment.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Repartition+Topic+Hints+in+Streams

[2] https://issues.apache.org/jira/browse/KAFKA-6037



Cheers,
Jeyhun











This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.








This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [DISCUSS] KIP-221: Repartition Topic Hints in Streams

2017-11-06 Thread Thomas Becker
I think this sounds good as well. It's worth clarifying whether topics that are 
named by the user but created by streams are considered "internal" topics also.

On Sun, 2017-11-05 at 23:02 +0100, Matthias J. Sax wrote:

My idea was, to relax the requirement for through() that a topic must be
created manually before startup.

Thus, if no through() call is made, a (internal) topic is created the
same way we do it currently.

If one uses `through(String topicName)` we keep the current behavior and
require users to create the topic manually.

The reasoning is as follows: if a user creates a topic manually, a user
can just use it for repartitioning. As the topic is already there, there
is no need to specify any topic configs.

We add a new `through()` overload (details TBD) that allows to specify
topic configs and Streams create the topic with those configs.

Reasoning: user don't want to manage topic manually, thus, it's still an
internal topic and Streams create the topic name automatically as for
all other internal topics. However, users gets some more control about
topic parameters like number of partitions (we should discuss what other
configs would be useful).


Does this make sense?


-Matthias


On 11/5/17 1:21 AM, Jan Filipiak wrote:


Hi.


Im not 100 % up to date what version 1.0 DSL looks like ATM.
I just would argue that repartitioning should be an own API call like
through or something.
One can use through or to already to get this. I would argue one should
look there instead of overloads

Best Jan

On 04.11.2017 16:01, Jeyhun Karimov wrote:


Dear community,

I would like to initiate discussion on KIP-221 [1] based on issue [2].
Please feel free to comment.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Repartition+Topic+Hints+in+Streams

[2] https://issues.apache.org/jira/browse/KAFKA-6037



Cheers,
Jeyhun











This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [DISCUSS] KIP-211: Revise Expiration Semantics of Consumer Group Offsets

2017-10-19 Thread Thomas Becker
I think it would be helpful to clarify what happens if consumers rejoin an 
empty group. I would presume that the expiration timer is stopped and reset 
back to offsets.retention.minutes when it is empty again but the KIP doesn't 
say.

On Wed, 2017-10-18 at 16:45 -0700, Vahid S Hashemian wrote:

Hi all,

I created a KIP to address the group offset expiration issue reported in
KAFKA-4682:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets

Your feedback is welcome!

Thanks.
--Vahid





This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


RE: GlobalKTable limitations

2017-05-25 Thread Thomas Becker
Hey Eno,
Thanks for the response. We have considered, but not yet tried that. One of the 
nice things about the GlobalKTable is that it fully "bootstraps" before the 
rest of the topology is started. But if part of the topology is itself 
generating the topic that backs the global table, it seems like that would 
effectively break. Also, doing this obviously requires re-materializing the 
data in a new topic.  To be fair, the topic we are building the table from has 
a bit of an unusual format, which is why I was trying to see if anyone else 
would think this was useful.

-Tommy


From: Eno Thereska [eno.there...@gmail.com]
Sent: Thursday, May 25, 2017 12:03 PM
To: dev@kafka.apache.org
Subject: Re: GlobalKTable limitations

Hi Thomas,

Have you considered doing the transformations on the topic, then outputting to 
another topic and then constructing the GlobalKTable from the latter?

The GlobalKTable has the limitations you mention since it was primarily 
designed for joins only. We should consider allowing a less restrictive 
interface if it makes sense.

Eno

> On 25 May 2017, at 14:48, Thomas Becker <tobec...@tivo.com> wrote:
>
> We need to do a series of joins against a KTable that we can't co-
> partition with the stream, so we're looking at GlobalKTable.  But the
> topic backing the table is not ideally keyed for the sort of lookups
> this particular processor needs to do. Unfortunately, GlobalKTable is
> very limited in that you can only build one with the exact keys/values
> from the backing topic. I'd like to be able to perform various
> transformations on the topic before materializing the table.  I'd
> envision it looking something like the following:
>
> builder.globalTable(keySerde, valueSerde, topicName)
>.filter((k, v) -> k.isFoo())
>.map((k, v) -> new KeyValue<>(k.getBar(), v.getBaz()))
>.build(tableKeySerde, tableValueSerde, storeName);
>
> Is this something that has been considered or that others would find
> useful?
>
> --
>
>
>Tommy Becker
>
>Senior Software Engineer
>
>O +1 919.460.4747
>
>tivo.com
>
>
> 
>
> This email and any attachments may contain confidential and privileged 
> material for the sole use of the intended recipient. Any review, copying, or 
> distribution of this email (or any attachments) by others is prohibited. If 
> you are not the intended recipient, please contact the sender immediately and 
> permanently delete this email and any attachments. No employee or agent of 
> TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo 
> Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed 
> written agreement.



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


GlobalKTable limitations

2017-05-25 Thread Thomas Becker
We need to do a series of joins against a KTable that we can't co-
partition with the stream, so we're looking at GlobalKTable.  But the
topic backing the table is not ideally keyed for the sort of lookups
this particular processor needs to do. Unfortunately, GlobalKTable is
very limited in that you can only build one with the exact keys/values
from the backing topic. I'd like to be able to perform various
transformations on the topic before materializing the table.  I'd
envision it looking something like the following:

builder.globalTable(keySerde, valueSerde, topicName)
.filter((k, v) -> k.isFoo())
.map((k, v) -> new KeyValue<>(k.getBar(), v.getBaz()))
.build(tableKeySerde, tableValueSerde, storeName);

Is this something that has been considered or that others would find
useful?

--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [VOTE] KIP-138: Change punctuate semantics

2017-05-10 Thread Thomas Becker
+1

On Wed, 2017-05-10 at 10:52 +0100, Michal Borowiecki wrote:
> Hi all,
>
> This vote thread has gone quiet.
>
> In view of the looming cut-off for 0.11.0.0 I'd like to encourage
> anyone
> who cares about this to have a look and vote and/or comment on this
> proposal.
>
> Thanks,
>
> Michał
>
>
> On 07/05/17 10:16, Eno Thereska wrote:
> >
> > +1 (non binding)
> >
> > Thanks
> > Eno
> > >
> > > On May 6, 2017, at 11:01 PM, Bill Bejeck 
> > > wrote:
> > >
> > > +1
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Sat, May 6, 2017 at 5:58 PM, Matthias J. Sax  > > nt.io>
> > > wrote:
> > >
> > > >
> > > > +1
> > > >
> > > > Thanks a lot for this KIP!
> > > >
> > > > -Matthias
> > > >
> > > > On 5/6/17 10:18 AM, Michal Borowiecki wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > Given I'm not seeing any contentious issues remaining on the
> > > > > discussion
> > > > > thread, I'd like to initiate the vote for:
> > > > >
> > > > > KIP-138: Change punctuate semantics
> > > > >
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 138%3A+Change+punctuate+semantics
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Michał
> > > > > --
> > > > > Signature
> > > > >  Michal Borowiecki
> > > > > Senior Software Engineer L4
> > > > >   T:  +44 208 742 1600
> > > > >
> > > > >
> > > > >   +44 203 249 8448
> > > > >
> > > > >
> > > > >
> > > > >   E:  michal.borowie...@openbet.com
> > > > >   W:  www.openbet.com 
> > > > >
> > > > >
> > > > >   OpenBet Ltd
> > > > >
> > > > >   Chiswick Park Building 9
> > > > >
> > > > >   566 Chiswick High Rd
> > > > >
> > > > >   London
> > > > >
> > > > >   W4 5XT
> > > > >
> > > > >   UK
> > > > >
> > > > >
> > > > > 
> > > > >
> > > > > This message is confidential and intended only for the
> > > > > addressee. If you
> > > > > have received this message in error, please immediately
> > > > > notify the
> > > > > postmas...@openbet.com  and
> > > > > delete it
> > > > > from your system as well as any copies. The content of e-
> > > > > mails as well
> > > > > as traffic data may be monitored by OpenBet for employment
> > > > > and security
> > > > > purposes. To protect the environment please do not print this
> > > > > e-mail
> > > > > unless necessary. OpenBet Ltd. Registered Office: Chiswick
> > > > > Park Building
> > > > > 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A
> > > > > company
> > > > > registered in England and Wales. Registered no. 3134634. VAT
> > > > > no.
> > > > > GB927523612
> > > > >
--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


GlobalKTable not checkpointing offsets but reusing store

2017-05-09 Thread Thomas Becker
I'm experimenting with a streams application that does a KStream-
GlobalKTable join, and I'm seeing some unexpected behavior when re-
running the application. First, it does not appear that the offsets in
the topic backing the GlobalKTable are being checkpointed to a file as
I expected. This results in the RocksDB store being rebuilt everytime I
run the app, which is time consuming. After some investigation, it
appears that the offset map that is written to the checkpoint file is
only updated once the application is at a steady-state. So the initial
state of the global table after restore is never checkpointed unless
additional messages come in. This results in the entire topic backing
the global table being re-read and re-inserted into the same RocksDB
instance every time the app starts, which makes the store very large
(since it then contains multiple copies of every message) and triggers
lots of compactions. Is this intended? Should I open a JIRA?


--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Exiting a streams app at end of stream?

2017-05-03 Thread Thomas Becker
We have had a number of situations where we need to migrate data in a
Kafka topic to a new topic that is keyed differently. Stream processing
is a good fit for this use-case with one exception: there is no easy
way to know when your "migration job" is finished. Has any thought been
given to adding an "end of stream" notion to Kafka Streams, and a
corresponding mode to exit the application when all input streams have
hit it?

--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [VOTE] KIP-123: Allow per stream/table timestamp extractor

2017-04-24 Thread Thomas Becker
+1 (non-binding)

On Tue, 2017-02-28 at 08:59 +, Jeyhun Karimov wrote:
> Dear community,
>
> I'd like to start the vote for KIP-123:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=6871
> 4788
>
>
> Cheers,
> Jeyhun
--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [DISCUSS] KIP-138: Change punctuate semantics

2017-04-04 Thread Thomas Becker
Yeah I like PuncutationType much better; I just threw Time out there
more as a strawman than an actual suggestion ;) I still think it's
worth considering what this buys us over an additional callback. I
foresee a number of punctuate implementations following this pattern:

public void punctuate(PunctuationType type) {
switch (type) {
case EVENT_TIME:
methodA();
break;
case SYSTEM_TIME:
methodB();
break;
}
}

I guess one advantage of this approach is we could add additional
punctuation types later in a backwards compatible way (like event count
as you mentioned).

-Tommy


On Tue, 2017-04-04 at 11:10 -0700, Matthias J. Sax wrote:
> That sounds promising.
>
> I am just wondering if `Time` is the best name. Maybe we want to add
> other non-time based punctuations at some point later. I would
> suggest
>
> enum PunctuationType {
>   EVENT_TIME,
>   SYSTEM_TIME,
> }
>
> or similar. Just to keep the door open -- it's easier to add new
> stuff
> if the name is more generic.
>
>
> -Matthias
>
>
> On 4/4/17 5:30 AM, Thomas Becker wrote:
> >
> > I agree that the framework providing and managing the notion of
> > stream
> > time is valuable and not something we would want to delegate to the
> > tasks. I'm not entirely convinced that a separate callback (option
> > C)
> > is that messy (it could just be a default method with an empty
> > implementation), but if we wanted a single API to handle both
> > cases,
> > how about something like the following?
> >
> > enum Time {
> >STREAM,
> >CLOCK
> > }
> >
> > Then on ProcessorContext:
> > context.schedule(Time time, long interval)  // We could allow this
> > to
> > be called once for each value of time to mix approaches.
> >
> > Then the Processor API becomes:
> > punctuate(Time time) // time here denotes which schedule resulted
> > in
> > this call.
> >
> > Thoughts?
> >
> >
> > On Mon, 2017-04-03 at 22:44 -0700, Matthias J. Sax wrote:
> > >
> > > Thanks a lot for the KIP Michal,
> > >
> > > I was thinking about the four options you proposed in more
> > > details
> > > and
> > > this are my thoughts:
> > >
> > > (A) You argue, that users can still "punctuate" on event-time via
> > > process(), but I am not sure if this is possible. Note, that
> > > users
> > > only
> > > get record timestamps via context.timestamp(). Thus, users would
> > > need
> > > to
> > > track the time progress per partition (based on the partitions
> > > they
> > > obverse via context.partition(). (This alone puts a huge burden
> > > on
> > > the
> > > user by itself.) However, users are not notified at startup what
> > > partitions are assigned, and user are not notified when
> > > partitions
> > > get
> > > revoked. Because this information is not available, it's not
> > > possible
> > > to
> > > "manually advance" stream-time, and thus event-time punctuation
> > > within
> > > process() seems not to be possible -- or do you see a way to get
> > > it
> > > done? And even if, it might still be too clumsy to use.
> > >
> > > (B) This does not allow to mix both approaches, thus limiting
> > > what
> > > users
> > > can do.
> > >
> > > (C) This should give all flexibility we need. However, just
> > > adding
> > > one
> > > more method seems to be a solution that is too simple (cf my
> > > comments
> > > below).
> > >
> > > (D) This might be hard to use. Also, I am not sure how a user
> > > could
> > > enable system-time and event-time punctuation in parallel.
> > >
> > >
> > >
> > > Overall options (C) seems to be the most promising approach to
> > > me.
> > > Because I also favor a clean API, we might keep current
> > > punctuate()
> > > as-is, but deprecate it -- so we can remove it at some later
> > > point
> > > when
> > > people use the "new punctuate API".
> > >
> > >
> > > Couple of follow up questions:
> > >
> > > - I am wondering, if we should have two callback methods or just
> > > one
> > > (ie, a unified for system and event time punctuation or one for
> > > each?).
> > >
> > > - If we have one, how can the user figure out, which cond

Re: [DISCUSS] KIP-138: Change punctuate semantics

2017-04-04 Thread Thomas Becker
I agree that the framework providing and managing the notion of stream
time is valuable and not something we would want to delegate to the
tasks. I'm not entirely convinced that a separate callback (option C)
is that messy (it could just be a default method with an empty
implementation), but if we wanted a single API to handle both cases,
how about something like the following?

enum Time {
   STREAM,
   CLOCK
}

Then on ProcessorContext:
context.schedule(Time time, long interval)  // We could allow this to
be called once for each value of time to mix approaches.

Then the Processor API becomes:
punctuate(Time time) // time here denotes which schedule resulted in
this call.

Thoughts?


On Mon, 2017-04-03 at 22:44 -0700, Matthias J. Sax wrote:
> Thanks a lot for the KIP Michal,
>
> I was thinking about the four options you proposed in more details
> and
> this are my thoughts:
>
> (A) You argue, that users can still "punctuate" on event-time via
> process(), but I am not sure if this is possible. Note, that users
> only
> get record timestamps via context.timestamp(). Thus, users would need
> to
> track the time progress per partition (based on the partitions they
> obverse via context.partition(). (This alone puts a huge burden on
> the
> user by itself.) However, users are not notified at startup what
> partitions are assigned, and user are not notified when partitions
> get
> revoked. Because this information is not available, it's not possible
> to
> "manually advance" stream-time, and thus event-time punctuation
> within
> process() seems not to be possible -- or do you see a way to get it
> done? And even if, it might still be too clumsy to use.
>
> (B) This does not allow to mix both approaches, thus limiting what
> users
> can do.
>
> (C) This should give all flexibility we need. However, just adding
> one
> more method seems to be a solution that is too simple (cf my comments
> below).
>
> (D) This might be hard to use. Also, I am not sure how a user could
> enable system-time and event-time punctuation in parallel.
>
>
>
> Overall options (C) seems to be the most promising approach to me.
> Because I also favor a clean API, we might keep current punctuate()
> as-is, but deprecate it -- so we can remove it at some later point
> when
> people use the "new punctuate API".
>
>
> Couple of follow up questions:
>
> - I am wondering, if we should have two callback methods or just one
> (ie, a unified for system and event time punctuation or one for
> each?).
>
> - If we have one, how can the user figure out, which condition did
> trigger?
>
> - How would the API look like, for registering different punctuate
> schedules? The "type" must be somehow defined?
>
> - We might want to add "complex" schedules later on (like, punctuate
> on
> every 10 seconds event-time or 60 seconds system-time whatever comes
> first). I don't say we should add this right away, but we might want
> to
> define the API in a way, that it allows extensions like this later
> on,
> without redesigning the API (ie, the API should be designed
> extensible)
>
> - Did you ever consider count-based punctuation?
>
>
> I understand, that you would like to solve a simple problem, but we
> learned from the past, that just "adding some API" quickly leads to a
> not very well defined API that needs time consuming clean up later on
> via other KIPs. Thus, I would prefer to get a holistic punctuation
> KIP
> with this from the beginning on to avoid later painful redesign.
>
>
>
> -Matthias
>
>
>
> On 4/3/17 11:58 AM, Michal Borowiecki wrote:
> >
> > Thanks Thomas,
> >
> > I'm also wary of changing the existing semantics of punctuate, for
> > backward compatibility reasons, although I like the conceptual
> > simplicity of that option.
> >
> > Adding a new method to me feels safer but, in a way, uglier. I
> > added
> > this to the KIP now as option (C).
> >
> > The TimestampExtractor mechanism is actually more flexible, as it
> > allows
> > you to return any value, you're not limited to event time or system
> > time
> > (although I don't see an actual use case where you might need
> > anything
> > else then those two). Hence I also proposed the option to allow
> > users
> > to, effectively, decide what "stream time" is for them given the
> > presence or absence of messages, much like they can decide what msg
> > time
> > means for them using the TimestampExtractor. What do you think
> > about
> > that? This is probably most flexible but also most complicated.
> >
> > Al

Re: [DISCUSS] KIP-138: Change punctuate semantics

2017-04-03 Thread Thomas Becker
Although I fully agree we need a way to trigger periodic processing
that is independent from whether and when messages arrive, I'm not sure
I like the idea of changing the existing semantics across the board.
What if we added an additional callback to Processor that can be
scheduled similarly to punctuate() but was always called at fixed, wall
clock based intervals? This way you wouldn't have to give up the notion
of stream time to be able to do periodic processing.

On Mon, 2017-04-03 at 10:34 +0100, Michal Borowiecki wrote:
> Hi all,
>
> I have created a draft for KIP-138: Change punctuate semantics
>  punctuate+semantics>
> .
>
> Appreciating there can be different views on system-time vs event-
> time
> semantics for punctuation depending on use-case and the importance of
> backwards compatibility of any such change, I've left it quite open
> and
> hope to fill in more info as the discussion progresses.
>
> Thanks,
> Michal
--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Old producer slow/no recovery on broker failure

2017-02-09 Thread Thomas Becker
We ran into an incident a while back where one of our broker machines
abruptly went down (AWS is fun). While the leadership transitions and
so forth seemed to work correctly with the remaining brokers, our
producers hung shortly thereafter. I should point out that we are using
the old Scala producer in async mode. What happened was that the
producer's queue filled up and the SyncProducer on the other end was
blocked in a write() call, waiting for ACKs that will never come. My
understanding of blocking IO on the JVM is that this call will block
until such time as the OS gives up on the TCP connection, which could
take as long as 30 minutes.

As a remedy, we're first going to set queue.enqueue.timeout.ms to some
positive value, as we're willing to lose some of these particular
messages to avoid blocking user requests. But this won't actually make
the producer recover more quickly. Is lowering the OS level TCP
keepalive time the right thing here? Also, can someone comment on
whether this behavior would also happen with the new producer? We want
to get there, but it hasn't been a priority.

--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.