Re: Streams, Kafka windows

2020-01-18 Thread John Roesler
Good idea! I’ll make a note to do it when I’m at a computer. 

On Sat, Jan 18, 2020, at 21:51, Guozhang Wang wrote:
> Hey John,
> 
> Since this is a common question and I've seen many users asking about
> window semantics like this, could you file a JIRA ticket for creating a
> wiki page like Join Semantics (
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
> to summarize the windowing operations like this too?
> 
> Guozhang
> 
> On Sat, Jan 18, 2020 at 3:22 PM John Roesler  wrote:
> 
> > Glad it helped!
> > -John
> >
> > On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote:
> > > Hi John,
> > >
> > > Thank you for your assistance!
> > > Your example very help me and I understood kafka-streams more clearly
> > now.
> > > Have a nice weekend :)
> > >
> > > Best regards,
> > > Viktor Markvardt
> > >
> > > чт, 16 янв. 2020 г. в 19:29, John Roesler :
> > >
> > > > Hi Viktor,
> > > >
> > > > I’m starting to wonder what exactly “duplicate” means in this context.
> > Can
> > > > you elaborate?
> > > >
> > > > In case it helps, with your window definition, if I send a record with
> > > > timestamp 20, it would actually belong to three different windows:
> > > > [0,30)
> > > > [10,40)
> > > > [20,50)
> > > >
> > > > Because of this, you would (correctly) see three output records for
> > that
> > > > one input, but the outputs wouldn’t be “duplicates” properly, because
> > > > they’d have different keys:
> > > >
> > > > Input:
> > > > Key1: Val1 @ timestamp:20
> > > >
> > > > Output:
> > > > Windowed: 1
> > > > Windowed: 1
> > > > Windowed: 1
> > > >
> > > > Any chance that explains your observation?
> > > >
> > > > Thanks,
> > > > John
> > > >
> > > >
> > > >
> > > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > > > > Hi John,
> > > > >
> > > > > Thanks for answering my questions!
> > > > > I observe behavior which I can not understand.
> > > > > The code is working, but when delay between records larger then
> > window
> > > > > duration I receive duplicated records.
> > > > > With the code below I received duplicated records in the output
> > kstream.
> > > > > Count of duplicate records is always 3. If I change
> > duration/advanceBy
> > > > > count of duplicated records is changing also.
> > > > > Do you have any ideas why duplicated records are received in the
> > output
> > > > > kstream?
> > > > >
> > > > > KStream windowedStream = source
> > > > > .groupByKey()
> > > > >
> > > > >
> > > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > > > > .count()
> > > > >
> > > > >
> > > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > .toStream();
> > > > >
> > > > >
> > > > > Best regards,
> > > > > Viktor Markvardt
> > > > >
> > > > > чт, 16 янв. 2020 г. в 04:59, John Roesler :
> > > > >
> > > > > > Hi Viktor,
> > > > > >
> > > > > > I’m not sure why you get two identical outputs in response to a
> > single
> > > > > > record. Regardless, since you say that you want to get a single,
> > final
> > > > > > result for the window and you expect multiple inputs to the
> > windows,
> > > > you
> > > > > > need Suppression.
> > > > > >
> > > > > > My guess is that you just sent one record to try it out and didn’t
> > see
> > > > any
> > > > > > output? This is expected. Just as the window boundaries are
> > defined by
> > > > the
> > > > > > time stamps of the records, not by the current system time,
> > > > suppression is
> > > > > > governed by the timestamp of the records. I.e., a thirty-second
> > window
> > > > is
> > > > > > not actually closed until you see a new record with a timestamp
> > thirty
> > > > > > seconds later.
> > > > > >
> > > > > >  Maybe try just sending a sequence of updates with incrementing
> > > > > > timestamps. If the first record has timestamp T, then you should
> > see an
> > > > > > output when you pass in a record with timestamp T+30.
> > > > > >
> > > > > > Important note: there is a built-in grace period that delays the
> > > > output of
> > > > > > final results after the window ends. For complicated reasons, the
> > > > default
> > > > > > is 24 hours! So you would actually not see an output until you
> > send a
> > > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you
> > set
> > > > the
> > > > > > grace period on TimeWindows to zero for your testing. You can
> > increase
> > > > it
> > > > > > later if you want to tolerate some late-arriving records.
> > > > > >
> > > > > > Thanks,
> > > > > > -John
> > > > > >
> > > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > > > > Hi,
> > > > > > >
> > > > > > > My name is Viktor. I'm currently working with Kafka streams and
> > have
> > > > > > > several questions about Kafka and I can not find answers in the
> > > > official
> > > > > > > docs.
> > > > > > >
> > > > > > > 1. Why suppress functionality does not work with Hopping 

Re: Streams, Kafka windows

2020-01-18 Thread Guozhang Wang
Hey John,

Since this is a common question and I've seen many users asking about
window semantics like this, could you file a JIRA ticket for creating a
wiki page like Join Semantics (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
to summarize the windowing operations like this too?

Guozhang

On Sat, Jan 18, 2020 at 3:22 PM John Roesler  wrote:

> Glad it helped!
> -John
>
> On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote:
> > Hi John,
> >
> > Thank you for your assistance!
> > Your example very help me and I understood kafka-streams more clearly
> now.
> > Have a nice weekend :)
> >
> > Best regards,
> > Viktor Markvardt
> >
> > чт, 16 янв. 2020 г. в 19:29, John Roesler :
> >
> > > Hi Viktor,
> > >
> > > I’m starting to wonder what exactly “duplicate” means in this context.
> Can
> > > you elaborate?
> > >
> > > In case it helps, with your window definition, if I send a record with
> > > timestamp 20, it would actually belong to three different windows:
> > > [0,30)
> > > [10,40)
> > > [20,50)
> > >
> > > Because of this, you would (correctly) see three output records for
> that
> > > one input, but the outputs wouldn’t be “duplicates” properly, because
> > > they’d have different keys:
> > >
> > > Input:
> > > Key1: Val1 @ timestamp:20
> > >
> > > Output:
> > > Windowed: 1
> > > Windowed: 1
> > > Windowed: 1
> > >
> > > Any chance that explains your observation?
> > >
> > > Thanks,
> > > John
> > >
> > >
> > >
> > > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > > > Hi John,
> > > >
> > > > Thanks for answering my questions!
> > > > I observe behavior which I can not understand.
> > > > The code is working, but when delay between records larger then
> window
> > > > duration I receive duplicated records.
> > > > With the code below I received duplicated records in the output
> kstream.
> > > > Count of duplicate records is always 3. If I change
> duration/advanceBy
> > > > count of duplicated records is changing also.
> > > > Do you have any ideas why duplicated records are received in the
> output
> > > > kstream?
> > > >
> > > > KStream windowedStream = source
> > > > .groupByKey()
> > > >
> > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > > > .count()
> > > >
> > > >
> > >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > .toStream();
> > > >
> > > >
> > > > Best regards,
> > > > Viktor Markvardt
> > > >
> > > > чт, 16 янв. 2020 г. в 04:59, John Roesler :
> > > >
> > > > > Hi Viktor,
> > > > >
> > > > > I’m not sure why you get two identical outputs in response to a
> single
> > > > > record. Regardless, since you say that you want to get a single,
> final
> > > > > result for the window and you expect multiple inputs to the
> windows,
> > > you
> > > > > need Suppression.
> > > > >
> > > > > My guess is that you just sent one record to try it out and didn’t
> see
> > > any
> > > > > output? This is expected. Just as the window boundaries are
> defined by
> > > the
> > > > > time stamps of the records, not by the current system time,
> > > suppression is
> > > > > governed by the timestamp of the records. I.e., a thirty-second
> window
> > > is
> > > > > not actually closed until you see a new record with a timestamp
> thirty
> > > > > seconds later.
> > > > >
> > > > >  Maybe try just sending a sequence of updates with incrementing
> > > > > timestamps. If the first record has timestamp T, then you should
> see an
> > > > > output when you pass in a record with timestamp T+30.
> > > > >
> > > > > Important note: there is a built-in grace period that delays the
> > > output of
> > > > > final results after the window ends. For complicated reasons, the
> > > default
> > > > > is 24 hours! So you would actually not see an output until you
> send a
> > > > > record with timestamp T+30+(24 hours) ! I strongly recommend you
> set
> > > the
> > > > > grace period on TimeWindows to zero for your testing. You can
> increase
> > > it
> > > > > later if you want to tolerate some late-arriving records.
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > > > Hi,
> > > > > >
> > > > > > My name is Viktor. I'm currently working with Kafka streams and
> have
> > > > > > several questions about Kafka and I can not find answers in the
> > > official
> > > > > > docs.
> > > > > >
> > > > > > 1. Why suppress functionality does not work with Hopping windows?
> > > How to
> > > > > > make it work?
> > > > > >
> > > > > > Example of the code:
> > > > > >
> > > > > > KStream finalStream = source
> > > > > > .groupByKey()
> > > > > >
> > > > > >
> > > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > > > > .reduce((aggValue, newValue) -> newValue,
> > > > > > 

Re: KIP-560 Discuss

2020-01-18 Thread Sang wn Lee
Thank you

I understand you

1. admin client has topic list
2. applicationId can only have one stream, so It won't be a problem!
3. For example, --input-topic [reg] 
Allowing reg solves some inconvenience


On 2020/01/18 18:15:23, Gwen Shapira  wrote: 
> I am not sure I follow. Afaik:
> 
> 1. Topics don't include client ID information
> 2. Even if you did, the same ID could be used for topics that are not Kafka
> Streams input
> 
> The regex idea sounds doable, but I'm not sure it solves much?
> 
> 
> On Sat, Jan 18, 2020, 7:12 AM Sang wn Lee  wrote:
> 
> > Thank you
> > Gwen Shapira!
> > We'll add a flag to clear all topics by clientId
> > It is ‘reset-all-external-topics’
> >
> > I also want to use regex on the input topic flag to clear all matching
> > topics.
> >
> > On 2020/01/17 19:29:09, Gwen Shapira  wrote:
> > > Seem like a very nice improvement to me. But I have to admit that I
> > > don't understand how this will how - how could you infer the input
> > > topics?
> > >
> > > On Thu, Jan 16, 2020 at 10:03 AM Sang wn Lee 
> > wrote:
> > > >
> > > > Hello,
> > > >
> > > > Starting this thread to discuss KIP-560:
> > > > wiki link :
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-560%3A+Auto+infer+external+topic+partitions+in+stream+reset+tool
> > > >
> > > > I'm newbie
> > > > I would like to receive feedback on the following features!
> > > >
> > > >
> > >
> >
> 


[jira] [Created] (KAFKA-9454) Relax transaction.id security requirement

2020-01-18 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9454:
--

 Summary: Relax transaction.id security requirement
 Key: KAFKA-9454
 URL: https://issues.apache.org/jira/browse/KAFKA-9454
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


As we are no longer required to configure transactional.id on client, we could 
piggy-back the security check with consumer group id instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9453) Make transaction.id optional in group mode EOS

2020-01-18 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9453:
--

 Summary: Make transaction.id optional in group mode EOS
 Key: KAFKA-9453
 URL: https://issues.apache.org/jira/browse/KAFKA-9453
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen


After 447, one of the big improvement is that we are no longer requiring single 
writer scope guarantee, so that user doesn't have to configure a unique 
transactional.id for transaction safety.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Streams, Kafka windows

2020-01-18 Thread John Roesler
Glad it helped!
-John

On Sat, Jan 18, 2020, at 12:27, Viktor Markvardt wrote:
> Hi John,
> 
> Thank you for your assistance!
> Your example very help me and I understood kafka-streams more clearly now.
> Have a nice weekend :)
> 
> Best regards,
> Viktor Markvardt
> 
> чт, 16 янв. 2020 г. в 19:29, John Roesler :
> 
> > Hi Viktor,
> >
> > I’m starting to wonder what exactly “duplicate” means in this context. Can
> > you elaborate?
> >
> > In case it helps, with your window definition, if I send a record with
> > timestamp 20, it would actually belong to three different windows:
> > [0,30)
> > [10,40)
> > [20,50)
> >
> > Because of this, you would (correctly) see three output records for that
> > one input, but the outputs wouldn’t be “duplicates” properly, because
> > they’d have different keys:
> >
> > Input:
> > Key1: Val1 @ timestamp:20
> >
> > Output:
> > Windowed: 1
> > Windowed: 1
> > Windowed: 1
> >
> > Any chance that explains your observation?
> >
> > Thanks,
> > John
> >
> >
> >
> > On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > > Hi John,
> > >
> > > Thanks for answering my questions!
> > > I observe behavior which I can not understand.
> > > The code is working, but when delay between records larger then window
> > > duration I receive duplicated records.
> > > With the code below I received duplicated records in the output kstream.
> > > Count of duplicate records is always 3. If I change duration/advanceBy
> > > count of duplicated records is changing also.
> > > Do you have any ideas why duplicated records are received in the output
> > > kstream?
> > >
> > > KStream windowedStream = source
> > > .groupByKey()
> > >
> > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > > .count()
> > >
> > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > .toStream();
> > >
> > >
> > > Best regards,
> > > Viktor Markvardt
> > >
> > > чт, 16 янв. 2020 г. в 04:59, John Roesler :
> > >
> > > > Hi Viktor,
> > > >
> > > > I’m not sure why you get two identical outputs in response to a single
> > > > record. Regardless, since you say that you want to get a single, final
> > > > result for the window and you expect multiple inputs to the windows,
> > you
> > > > need Suppression.
> > > >
> > > > My guess is that you just sent one record to try it out and didn’t see
> > any
> > > > output? This is expected. Just as the window boundaries are defined by
> > the
> > > > time stamps of the records, not by the current system time,
> > suppression is
> > > > governed by the timestamp of the records. I.e., a thirty-second window
> > is
> > > > not actually closed until you see a new record with a timestamp thirty
> > > > seconds later.
> > > >
> > > >  Maybe try just sending a sequence of updates with incrementing
> > > > timestamps. If the first record has timestamp T, then you should see an
> > > > output when you pass in a record with timestamp T+30.
> > > >
> > > > Important note: there is a built-in grace period that delays the
> > output of
> > > > final results after the window ends. For complicated reasons, the
> > default
> > > > is 24 hours! So you would actually not see an output until you send a
> > > > record with timestamp T+30+(24 hours) ! I strongly recommend you set
> > the
> > > > grace period on TimeWindows to zero for your testing. You can increase
> > it
> > > > later if you want to tolerate some late-arriving records.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > > Hi,
> > > > >
> > > > > My name is Viktor. I'm currently working with Kafka streams and have
> > > > > several questions about Kafka and I can not find answers in the
> > official
> > > > > docs.
> > > > >
> > > > > 1. Why suppress functionality does not work with Hopping windows?
> > How to
> > > > > make it work?
> > > > >
> > > > > Example of the code:
> > > > >
> > > > > KStream finalStream = source
> > > > > .groupByKey()
> > > > >
> > > > >
> > > >
> > .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > > > .reduce((aggValue, newValue) -> newValue,
> > > > > Materialized.with(Serdes.String(), Serdes.String()))
> > > > >
> > > > >
> > > >
> > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > > .toStream();
> > > > >
> > > > > finalStream.print(Printed.toSysOut());
> > > > > finalStream.to(outputTopic);
> > > > >
> > > > > After I run the code above - output stream is empty. There were no
> > > > > errors/exceptions.
> > > > > NOTE: With Tumbling Window the code working as expected.
> > > > > Maybe I simply use it incorrectly?
> > > > >
> > > > > 2. Why with Hopping windows (without suppress) there are duplicates
> > in
> > > > the
> > > > > output stream?
> > > > > E.g., I send one record in the input kstream with 

Re: Streams, Kafka windows

2020-01-18 Thread Viktor Markvardt
Hi John,

Thank you for your assistance!
Your example very help me and I understood kafka-streams more clearly now.
Have a nice weekend :)

Best regards,
Viktor Markvardt

чт, 16 янв. 2020 г. в 19:29, John Roesler :

> Hi Viktor,
>
> I’m starting to wonder what exactly “duplicate” means in this context. Can
> you elaborate?
>
> In case it helps, with your window definition, if I send a record with
> timestamp 20, it would actually belong to three different windows:
> [0,30)
> [10,40)
> [20,50)
>
> Because of this, you would (correctly) see three output records for that
> one input, but the outputs wouldn’t be “duplicates” properly, because
> they’d have different keys:
>
> Input:
> Key1: Val1 @ timestamp:20
>
> Output:
> Windowed: 1
> Windowed: 1
> Windowed: 1
>
> Any chance that explains your observation?
>
> Thanks,
> John
>
>
>
> On Thu, Jan 16, 2020, at 10:18, Viktor Markvardt wrote:
> > Hi John,
> >
> > Thanks for answering my questions!
> > I observe behavior which I can not understand.
> > The code is working, but when delay between records larger then window
> > duration I receive duplicated records.
> > With the code below I received duplicated records in the output kstream.
> > Count of duplicate records is always 3. If I change duration/advanceBy
> > count of duplicated records is changing also.
> > Do you have any ideas why duplicated records are received in the output
> > kstream?
> >
> > KStream windowedStream = source
> > .groupByKey()
> >
> >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ZERO).advanceBy(Duration.ofSeconds(10)))
> > .count()
> >
> >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > .toStream();
> >
> >
> > Best regards,
> > Viktor Markvardt
> >
> > чт, 16 янв. 2020 г. в 04:59, John Roesler :
> >
> > > Hi Viktor,
> > >
> > > I’m not sure why you get two identical outputs in response to a single
> > > record. Regardless, since you say that you want to get a single, final
> > > result for the window and you expect multiple inputs to the windows,
> you
> > > need Suppression.
> > >
> > > My guess is that you just sent one record to try it out and didn’t see
> any
> > > output? This is expected. Just as the window boundaries are defined by
> the
> > > time stamps of the records, not by the current system time,
> suppression is
> > > governed by the timestamp of the records. I.e., a thirty-second window
> is
> > > not actually closed until you see a new record with a timestamp thirty
> > > seconds later.
> > >
> > >  Maybe try just sending a sequence of updates with incrementing
> > > timestamps. If the first record has timestamp T, then you should see an
> > > output when you pass in a record with timestamp T+30.
> > >
> > > Important note: there is a built-in grace period that delays the
> output of
> > > final results after the window ends. For complicated reasons, the
> default
> > > is 24 hours! So you would actually not see an output until you send a
> > > record with timestamp T+30+(24 hours) ! I strongly recommend you set
> the
> > > grace period on TimeWindows to zero for your testing. You can increase
> it
> > > later if you want to tolerate some late-arriving records.
> > >
> > > Thanks,
> > > -John
> > >
> > > On Tue, Jan 14, 2020, at 10:41, Viktor Markvardt wrote:
> > > > Hi,
> > > >
> > > > My name is Viktor. I'm currently working with Kafka streams and have
> > > > several questions about Kafka and I can not find answers in the
> official
> > > > docs.
> > > >
> > > > 1. Why suppress functionality does not work with Hopping windows?
> How to
> > > > make it work?
> > > >
> > > > Example of the code:
> > > >
> > > > KStream finalStream = source
> > > > .groupByKey()
> > > >
> > > >
> > >
> .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(10)))
> > > > .reduce((aggValue, newValue) -> newValue,
> > > > Materialized.with(Serdes.String(), Serdes.String()))
> > > >
> > > >
> > >
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
> > > > .toStream();
> > > >
> > > > finalStream.print(Printed.toSysOut());
> > > > finalStream.to(outputTopic);
> > > >
> > > > After I run the code above - output stream is empty. There were no
> > > > errors/exceptions.
> > > > NOTE: With Tumbling Window the code working as expected.
> > > > Maybe I simply use it incorrectly?
> > > >
> > > > 2. Why with Hopping windows (without suppress) there are duplicates
> in
> > > the
> > > > output stream?
> > > > E.g., I send one record in the input kstream with Hopping window
> > > > (duration=30s, advanceBy=2s) but get two same records (duplicate) in
> the
> > > > output kstream.
> > > > Is that an expected behavior? If so, how can I filter/switch off
> these
> > > > duplicates?
> > > >
> > > > 3. Mainly I'm trying to solve this problem:
> > > > I have kstream with events inside and events can be repeated
> > > (duplicates).
> > > 

[DISCUSS] KIP-552: Add new cached authorizer:change the dim of cache

2020-01-18 Thread Steven Lu
Hello all,

I wrote a KIP about adding the new cached authorizer,this improvement can 
reduce greatly the CPU usage in the long run.
Please take a look: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-553%3A+Using+AclCommand%2Cavoid+call+the+global+method+loadcache+in+SimpleAclAuthorizer

Thanks,
Steven


Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-18 Thread Navinder Brar
Sure John, I will update the StoreQueryParams with static factory methods.
@Ted, we would need to create taskId only in case a user provides one single 
partition. In case user wants to query all partitions of an instance the 
current code is good enough where we iterate over all stream threads and go 
over all taskIds to match the store. But in case a user requests for a single 
partition-based store, we need to create a taskId out of that partition and 
store name(using internalTopologyBuilder class) and match with the taskIds 
belonging to that instance. I will add the code in the KIP. 

On Sunday, 19 January, 2020, 12:47:08 am IST, Ted Yu  
wrote:  
 
 Looking at the current KIP-562:

bq. Create a taskId from the combination of store name and partition
provided by the user

I wonder if a single taskId would be used for the “all partitions” case.
If so, we need to choose a numerical value for the partition portion of the
taskId.

On Sat, Jan 18, 2020 at 10:27 AM John Roesler  wrote:

> Thanks, Ted!
>
> This makes sense, but it seems like we should lean towards explicit
> semantics in the public API. ‘-1’ meaning “all partitions” is reasonable,
> but not explicit. That’s why I suggested the Boolean for “all partitions”.
> I guess this also means getPartition() should either throw an exception or
> return null if the partition is unspecified.
>
> Thanks,
> John
>
> On Sat, Jan 18, 2020, at 08:43, Ted Yu wrote:
> > I wonder if the following two methods can be combined:
> >
> > Integer getPartition() // would be null if unset or if "all partitions"
> > boolean getAllLocalPartitions() // true/false if "all partitions"
> requested
> >
> > into:
> >
> > Integer getPartition() // would be null if unset or -1 if "all
> partitions"
> >
> > Cheers
> >
> > On Fri, Jan 17, 2020 at 9:56 PM John Roesler 
> wrote:
> >
> > > Thanks, Navinder!
> > >
> > > I took a look at the KIP.
> > >
> > > We tend to use static factory methods instead of public constructors,
> and
> > > also builders for optional parameters.
> > >
> > > Given that, I think it would be more typical to have a factory method:
> > > storeQueryParams()
> > >
> > > and also builders for setting the optional parameters, like:
> > > withPartitions(List partitions)
> > > withStaleStoresEnabled()
> > > withStaleStoresDisabled()
> > >
> > >
> > > I was also thinking this over today, and it really seems like there are
> > > two main cases for specifying partitions,
> > > 1. you know exactly what partition you want. In this case, you'll only
> > > pass in a single number.
> > > 2. you want to get a handle on all the stores for this instance (the
> > > current behavior). In this case, it's not clear how to use
> withPartitions
> > > to achieve the goal, unless you want to apply a-priori knowledge of the
> > > number of partitions in the store. We could consider an empty list, or
> a
> > > null, to indicate "all", but that seems a little complicated.
> > >
> > > Thus, maybe it would actually be better to eschew withPartitions for
> now
> > > and instead just offer:
> > > withPartition(int partition)
> > > withAllLocalPartitions()
> > >
> > > and the getters:
> > > Integer getPartition() // would be null if unset or if "all partitions"
> > > boolean getAllLocalPartitions() // true/false if "all partitions"
> requested
> > >
> > > Sorry, I know I'm stirring the pot, but what do you think about this?
> > >
> > > Oh, also, the KIP is missing the method signature for the new
> > > KafkaStreams#store overload.
> > >
> > > Thanks!
> > > -John
> > >
> > > On Fri, Jan 17, 2020, at 08:07, Navinder Brar wrote:
> > > > Hi all,
> > > > I have created a new
> > > > KIP:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> > > > Please take a look if you get a chance.
> > > > ~Navinder
> > >
> >
>  

Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-18 Thread Ted Yu
Looking at the current KIP-562:

bq. Create a taskId from the combination of store name and partition
provided by the user

I wonder if a single taskId would be used for the “all partitions” case.
If so, we need to choose a numerical value for the partition portion of the
taskId.

On Sat, Jan 18, 2020 at 10:27 AM John Roesler  wrote:

> Thanks, Ted!
>
> This makes sense, but it seems like we should lean towards explicit
> semantics in the public API. ‘-1’ meaning “all partitions” is reasonable,
> but not explicit. That’s why I suggested the Boolean for “all partitions”.
> I guess this also means getPartition() should either throw an exception or
> return null if the partition is unspecified.
>
> Thanks,
> John
>
> On Sat, Jan 18, 2020, at 08:43, Ted Yu wrote:
> > I wonder if the following two methods can be combined:
> >
> > Integer getPartition() // would be null if unset or if "all partitions"
> > boolean getAllLocalPartitions() // true/false if "all partitions"
> requested
> >
> > into:
> >
> > Integer getPartition() // would be null if unset or -1 if "all
> partitions"
> >
> > Cheers
> >
> > On Fri, Jan 17, 2020 at 9:56 PM John Roesler 
> wrote:
> >
> > > Thanks, Navinder!
> > >
> > > I took a look at the KIP.
> > >
> > > We tend to use static factory methods instead of public constructors,
> and
> > > also builders for optional parameters.
> > >
> > > Given that, I think it would be more typical to have a factory method:
> > > storeQueryParams()
> > >
> > > and also builders for setting the optional parameters, like:
> > > withPartitions(List partitions)
> > > withStaleStoresEnabled()
> > > withStaleStoresDisabled()
> > >
> > >
> > > I was also thinking this over today, and it really seems like there are
> > > two main cases for specifying partitions,
> > > 1. you know exactly what partition you want. In this case, you'll only
> > > pass in a single number.
> > > 2. you want to get a handle on all the stores for this instance (the
> > > current behavior). In this case, it's not clear how to use
> withPartitions
> > > to achieve the goal, unless you want to apply a-priori knowledge of the
> > > number of partitions in the store. We could consider an empty list, or
> a
> > > null, to indicate "all", but that seems a little complicated.
> > >
> > > Thus, maybe it would actually be better to eschew withPartitions for
> now
> > > and instead just offer:
> > > withPartition(int partition)
> > > withAllLocalPartitions()
> > >
> > > and the getters:
> > > Integer getPartition() // would be null if unset or if "all partitions"
> > > boolean getAllLocalPartitions() // true/false if "all partitions"
> requested
> > >
> > > Sorry, I know I'm stirring the pot, but what do you think about this?
> > >
> > > Oh, also, the KIP is missing the method signature for the new
> > > KafkaStreams#store overload.
> > >
> > > Thanks!
> > > -John
> > >
> > > On Fri, Jan 17, 2020, at 08:07, Navinder Brar wrote:
> > > > Hi all,
> > > > I have created a new
> > > > KIP:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> > > > Please take a look if you get a chance.
> > > > ~Navinder
> > >
> >
>


Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-18 Thread John Roesler
Thanks, Ted!

This makes sense, but it seems like we should lean towards explicit semantics 
in the public API. ‘-1’ meaning “all partitions” is reasonable, but not 
explicit. That’s why I suggested the Boolean for “all partitions”. I guess this 
also means getPartition() should either throw an exception or return null if 
the partition is unspecified. 

Thanks,
John

On Sat, Jan 18, 2020, at 08:43, Ted Yu wrote:
> I wonder if the following two methods can be combined:
> 
> Integer getPartition() // would be null if unset or if "all partitions"
> boolean getAllLocalPartitions() // true/false if "all partitions" requested
> 
> into:
> 
> Integer getPartition() // would be null if unset or -1 if "all partitions"
> 
> Cheers
> 
> On Fri, Jan 17, 2020 at 9:56 PM John Roesler  wrote:
> 
> > Thanks, Navinder!
> >
> > I took a look at the KIP.
> >
> > We tend to use static factory methods instead of public constructors, and
> > also builders for optional parameters.
> >
> > Given that, I think it would be more typical to have a factory method:
> > storeQueryParams()
> >
> > and also builders for setting the optional parameters, like:
> > withPartitions(List partitions)
> > withStaleStoresEnabled()
> > withStaleStoresDisabled()
> >
> >
> > I was also thinking this over today, and it really seems like there are
> > two main cases for specifying partitions,
> > 1. you know exactly what partition you want. In this case, you'll only
> > pass in a single number.
> > 2. you want to get a handle on all the stores for this instance (the
> > current behavior). In this case, it's not clear how to use withPartitions
> > to achieve the goal, unless you want to apply a-priori knowledge of the
> > number of partitions in the store. We could consider an empty list, or a
> > null, to indicate "all", but that seems a little complicated.
> >
> > Thus, maybe it would actually be better to eschew withPartitions for now
> > and instead just offer:
> > withPartition(int partition)
> > withAllLocalPartitions()
> >
> > and the getters:
> > Integer getPartition() // would be null if unset or if "all partitions"
> > boolean getAllLocalPartitions() // true/false if "all partitions" requested
> >
> > Sorry, I know I'm stirring the pot, but what do you think about this?
> >
> > Oh, also, the KIP is missing the method signature for the new
> > KafkaStreams#store overload.
> >
> > Thanks!
> > -John
> >
> > On Fri, Jan 17, 2020, at 08:07, Navinder Brar wrote:
> > > Hi all,
> > > I have created a new
> > > KIP:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> > > Please take a look if you get a chance.
> > > ~Navinder
> >
>


Re: KIP-560 Discuss

2020-01-18 Thread Gwen Shapira
I am not sure I follow. Afaik:

1. Topics don't include client ID information
2. Even if you did, the same ID could be used for topics that are not Kafka
Streams input

The regex idea sounds doable, but I'm not sure it solves much?


On Sat, Jan 18, 2020, 7:12 AM Sang wn Lee  wrote:

> Thank you
> Gwen Shapira!
> We'll add a flag to clear all topics by clientId
> It is ‘reset-all-external-topics’
>
> I also want to use regex on the input topic flag to clear all matching
> topics.
>
> On 2020/01/17 19:29:09, Gwen Shapira  wrote:
> > Seem like a very nice improvement to me. But I have to admit that I
> > don't understand how this will how - how could you infer the input
> > topics?
> >
> > On Thu, Jan 16, 2020 at 10:03 AM Sang wn Lee 
> wrote:
> > >
> > > Hello,
> > >
> > > Starting this thread to discuss KIP-560:
> > > wiki link :
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-560%3A+Auto+infer+external+topic+partitions+in+stream+reset+tool
> > >
> > > I'm newbie
> > > I would like to receive feedback on the following features!
> > >
> > >
> >
>


Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-18 Thread John Roesler
Hi Navinder,

Thanks for the explanation. Your thinking makes sense, but those classes are 
actually only constructed internally by Streams, never by users of the API. I 
believe all the public config objects are constructed with factory methods. 

What you say for the partition selection sounds perfect!

Thanks,
John

On Sat, Jan 18, 2020, at 01:52, Navinder Brar wrote:
> Hi John,
> Thanks for looking into it. 
> On using constructors rather than static factory methods I was coming 
> from the convention on the classes currently available to users such as 
> LagInfo and KeyQueryMetadata. Let me know if it's still favorable to 
> change StoreQueryParams into static factory method, I will update the 
> KIP.
> On List partitions vs Integer partition, I agree sending empty 
> list to return all is cumbersome so will change the class to have a 
> single partition in case users want to fetch store for a partition and 
> if it's unset we will return all partitions available.
> On KafkaStreams#store overload - noted. I will update the KIP.
> Thanks,Navinder 
> 
> On Saturday, 18 January, 2020, 11:26:30 am IST, John Roesler 
>  wrote:  
>  
>  Thanks, Navinder!
> 
> I took a look at the KIP.
> 
> We tend to use static factory methods instead of public constructors, 
> and also builders for optional parameters.
> 
> Given that, I think it would be more typical to have a factory method:
> storeQueryParams() 
> 
> and also builders for setting the optional parameters, like:
> withPartitions(List partitions)
> withStaleStoresEnabled()
> withStaleStoresDisabled()
> 
> 
> I was also thinking this over today, and it really seems like there are 
> two main cases for specifying partitions,
> 1. you know exactly what partition you want. In this case, you'll only 
> pass in a single number.
> 2. you want to get a handle on all the stores for this instance (the 
> current behavior). In this case, it's not clear how to use 
> withPartitions to achieve the goal, unless you want to apply a-priori 
> knowledge of the number of partitions in the store. We could consider 
> an empty list, or a null, to indicate "all", but that seems a little 
> complicated.
> 
> Thus, maybe it would actually be better to eschew withPartitions for 
> now and instead just offer:
> withPartition(int partition)
> withAllLocalPartitions()
> 
> and the getters:
> Integer getPartition() // would be null if unset or if "all partitions"
> boolean getAllLocalPartitions() // true/false if "all partitions" requested
> 
> Sorry, I know I'm stirring the pot, but what do you think about this?
> 
> Oh, also, the KIP is missing the method signature for the new 
> KafkaStreams#store overload.
> 
> Thanks!
> -John
> 
> On Fri, Jan 17, 2020, at 08:07, Navinder Brar wrote:
> > Hi all,
> > I have created a new 
> > KIP: 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> > Please take a look if you get a chance.
> > ~Navinder


Re: KIP-560 Discuss

2020-01-18 Thread Sang wn Lee
Thank you
Gwen Shapira!
We'll add a flag to clear all topics by clientId
It is ‘reset-all-external-topics’

I also want to use regex on the input topic flag to clear all matching topics.

On 2020/01/17 19:29:09, Gwen Shapira  wrote: 
> Seem like a very nice improvement to me. But I have to admit that I
> don't understand how this will how - how could you infer the input
> topics?
> 
> On Thu, Jan 16, 2020 at 10:03 AM Sang wn Lee  wrote:
> >
> > Hello,
> >
> > Starting this thread to discuss KIP-560:
> > wiki link :
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-560%3A+Auto+infer+external+topic+partitions+in+stream+reset+tool
> >
> > I'm newbie
> > I would like to receive feedback on the following features!
> >
> >
> 


Re: [DISCUSS] : KIP-562: Allow fetching a key from a single partition rather than iterating over all the stores on an instance

2020-01-18 Thread Ted Yu
I wonder if the following two methods can be combined:

Integer getPartition() // would be null if unset or if "all partitions"
boolean getAllLocalPartitions() // true/false if "all partitions" requested

into:

Integer getPartition() // would be null if unset or -1 if "all partitions"

Cheers

On Fri, Jan 17, 2020 at 9:56 PM John Roesler  wrote:

> Thanks, Navinder!
>
> I took a look at the KIP.
>
> We tend to use static factory methods instead of public constructors, and
> also builders for optional parameters.
>
> Given that, I think it would be more typical to have a factory method:
> storeQueryParams()
>
> and also builders for setting the optional parameters, like:
> withPartitions(List partitions)
> withStaleStoresEnabled()
> withStaleStoresDisabled()
>
>
> I was also thinking this over today, and it really seems like there are
> two main cases for specifying partitions,
> 1. you know exactly what partition you want. In this case, you'll only
> pass in a single number.
> 2. you want to get a handle on all the stores for this instance (the
> current behavior). In this case, it's not clear how to use withPartitions
> to achieve the goal, unless you want to apply a-priori knowledge of the
> number of partitions in the store. We could consider an empty list, or a
> null, to indicate "all", but that seems a little complicated.
>
> Thus, maybe it would actually be better to eschew withPartitions for now
> and instead just offer:
> withPartition(int partition)
> withAllLocalPartitions()
>
> and the getters:
> Integer getPartition() // would be null if unset or if "all partitions"
> boolean getAllLocalPartitions() // true/false if "all partitions" requested
>
> Sorry, I know I'm stirring the pot, but what do you think about this?
>
> Oh, also, the KIP is missing the method signature for the new
> KafkaStreams#store overload.
>
> Thanks!
> -John
>
> On Fri, Jan 17, 2020, at 08:07, Navinder Brar wrote:
> > Hi all,
> > I have created a new
> > KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance
> > Please take a look if you get a chance.
> > ~Navinder
>


Build failed in Jenkins: kafka-2.4-jdk8 #129

2020-01-18 Thread Apache Jenkins Server
See 


Changes:

[matthias] MINOR: move "Added/Removed sensor" log messages to TRACE (#7502)


--
[...truncated 5.49 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldDeleteAndReturnPlainValue STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldDeleteAndReturnPlainValue PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 

[jira] [Created] (KAFKA-9452) Add new cached authorizer:change the dim of cache

2020-01-18 Thread Steven Lu (Jira)
Steven Lu created KAFKA-9452:


 Summary: Add new cached authorizer:change the dim of cache
 Key: KAFKA-9452
 URL: https://issues.apache.org/jira/browse/KAFKA-9452
 Project: Kafka
  Issue Type: Improvement
  Components: security
Reporter: Steven Lu


Same like issues https://issues.apache.org/jira/browse/KAFKA-5261 ,

We met the same performance issue which is descripted in the pr 
[#3756|https://github.com/apache/kafka/pull/3756] in our production 
environment,hence, we make a revision for the mechamisum of authorization, our 
revision have such optimizations

1、Build a cache for authorization, which can avoid recomputation of 
authorization result. The authorization result will fetch on the result catch 
if the same result has been computed rather than compute it again
2、Differ from the pr 3756, when we build the result cache of the authorization, 
we take the resource into first consideration. In this way, the authorization 
is recomputed only when the authorization are change of specific resource. 
Compared to the the frequency of recomputation can be reduced obviously.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-559: Make the Kafka Protocol Friendlier with L7 Proxies

2020-01-18 Thread David Jacot
Hi all,

FYI - I will start a vote on Monday if there no comments.

Regards,
David

Le mer. 15 janv. 2020 à 13:48, David Jacot  a écrit :

> Hi all,
>
> I just posted KIP-559: Make the Kafka Protocol Friendlier with L7 Proxies:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-559%3A+Make+the+Kafka+Protocol+Friendlier+with+L7+Proxies
>
> Have a look and let me know what you think.
>
> Best,
> David
>
>
>