Re: [DISCUSS] KIP-210: Provide for custom error handling when Kafka Streams fails to produce

2017-12-05 Thread Guozhang Wang
Hello Matt,

Thanks for writing up the KIP. I made a pass over it and here is a few
minor comments. I think you can consider starting a voting thread for this
KIP while addressing them.

1) We do not have the "ProductionExceptionHandler" interface defined in the
wiki page, thought it is sort of clear that it is a one-function interface
with record and exception. Could you add it?

2) A quick question about your example code: where would be the "logger"
object be created? Note that the implementation of this interface have to
give a non-param constructor, or as a static field of the class but in that
case you would not be able to log which instance is throwing this error (we
may have multiple producers within a single instance, even within a
thread). Just a reminder to consider in your implementation.


Guozhang

On Tue, Dec 5, 2017 at 3:15 PM, Matthias J. Sax 
wrote:

> Thanks a lot for the update! Great write-up! Very clearly explained what
> the change will look like!
>
> Looks good to me. No further comments from my side.
>
>
> -Matthias
>
>
> On 12/5/17 9:14 AM, Matt Farmer wrote:
> > I have updated this KIP accordingly.
> >
> > Can you please take a look and let me know if what I wrote looks correct
> to
> > you?
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 210+-+Provide+for+custom+error+handling++when+Kafka+
> Streams+fails+to+produce
> >
> > Thanks!
> >
> > Matt
> >
> >
> > On December 4, 2017 at 9:39:13 PM, Matt Farmer (m...@frmr.me) wrote:
> >
> > Hey Matthias, thanks for getting back to me.
> >
> > That's fine. But if we add it to `test` package, we don't need to talk
> > about it in the KIP. `test` is not public API.
> >
> > Yes, that makes sense. It was in the KIP originally because I was, at one
> > point, planning on including it. We can remove it now that we’ve decided
> we
> > won’t include it in the public API.
> >
> > Understood. That makes sense. We should explain this clearly in the KIP
> > and maybe log all other following exceptions at DEBUG level?
> >
> >
> > I thought it was clear in the KIP, but I can go back and double check my
> > wording and revise it to try and make it clearer.
> >
> > I’ll take a look at doing more work on the KIP and the Pull Request
> > tomorrow.
> >
> > Thanks again!
> >
> > On December 4, 2017 at 5:50:33 PM, Matthias J. Sax (
> matth...@confluent.io)
> > wrote:
> >
> > Hey,
> >
> > About your questions:
> >
>  Acknowledged, so is ProducerFencedException the only kind of
> exception I
>  need to change my behavior on? Or are there other types I need to
> > check? Is
>  there a comprehensive list somewhere?
> >
> > I cannot think if any other atm. We should list all fatal exceptions for
> > which we don't call the handler and explain why (exception is "global"
> > and will affect all other records, too | ProducerFenced is self-healing).
> >
> > We started to collect and categorize exception here (not completed yet):
> > https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Architecture#KafkaStreamsArchitecture-TypesofExceptions
> > :
> >
> > This list should be a good starting point though.
> >
> >> I include it in the test package because I have tests that assert that
> if
> >> the record collector impl encounters an Exception and receives a
> CONTINUE
> >> that it actually does CONTINUE.
> >
> > That's fine. But if we add it to `test` package, we don't need to talk
> > about it in the KIP. `test` is not public API.
> >
> >> I didn't want to invoke the handler in places where the CONTINUE or FAIL
> >> result would just be ignored. Presumably, after a FAIL has been
> returned,
> >> subsequent exceptions are likely to be repeats or noise from my
> >> understanding of the code paths. If this is incorrect we can revisit.
> >
> > Understood. That makes sense. We should explain this clearly in the KIP
> > and maybe log all other following exceptions at DEBUG level?
> >
> >
> > -Matthias
> >
> >
> > On 12/1/17 11:43 AM, Matt Farmer wrote:
> >> Bump! It's been three days here and I haven't seen any further feedback.
> >> Eager to get this resolved, approved, and merged. =)
> >>
> >> On Tue, Nov 28, 2017 at 9:53 AM Matt Farmer  wrote:
> >>
> >>> Hi there, sorry for the delay in responding. Last week had a holiday
> and
> >>> several busy work days in it so I'm just now getting around to
> > responding.
> >>>
>  We would only exclude
>  exception Streams can handle itself (like ProducerFencedException) --
>  thus, if the handler has code to react to this, it would not be bad,
> as
>  this code is just never called.
> >>> [...]
>  Thus, I am still in favor of not calling the
> ProductionExceptionHandler
>  for fatal exception.
> >>>
> >>> Acknowledged, so is ProducerFencedException the only kind of exception
> I
> >>> need to change my behavior on? Or are there other types I need to
> check?
> > Is
> >>> there a comprehensive list somewhere?
> >>>
>  About 

Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Dong Lin
Sounds good. I don't think there is concern with using Long.MIN_VALUE to
indicate that timestamp is not available.

As Matthias also mentioned, using Long.MIN_VALUE to indicate missing
timestamp seems better than overloading -1 semantics. Do you want to update
the "NO_TIMESTAMP (−1) problem" session in the KIP? It may also be useful
to briefly mention the alternative solution we discussed (I realized that
Ted also mentioned this alternative).

Thanks,
Dong

On Tue, Dec 5, 2017 at 8:26 PM, Boerge Svingen  wrote:

>
> Thank you for the suggestion. We considered this before. It works, but
> it’s a hack, and we would be providing a bad user experience for our
> consumers if we had to explain, “if you want to start consuming in 2014,
> you have to pretend to want 2214”.
>
> We would rather solve the underlying problem. These are perfectly valid
> timestamps, and I can’t see any reason why Kafka shouldn’t support them - I
> don’t think using `Long.MIN_VALUE` instead of -1 would necessarily add
> complexity here?
>
>
> Thanks,
> Boerge.
>
>
>
> > On 2017-12-05, at 21:36, Dong Lin  wrote:
> >
> > Hey Boerge,
> >
> > Thanks for the blog link. I will read this blog later.
> >
> > Here is another alternative solution which may be worth thinking. We know
> > that the Unix time 0 corresponds to January 1, 1970. Let's say the
> earliest
> > time you may want to use as the timestamp of the Kafka message is within
> X
> > milliseconds before the January 1, 1970. Then you can add X to the
> > timestamp before you produce Kafka message. And you can also make similar
> > conversion when you use `offsetsForTimes()` or after you consume
> messages.
> > This seems to address your use-case without introducing negative
> timestamp.
> >
> > IMO, this solution requires a bit more logic in your application code.
> But
> > it keeps the Kafka timestamp logic simple and we reserve the capability
> to
> > use timestamp -1 for messages without timestamp for most Kafka users who
> do
> > not need negative timestamp. Do you think this would be a good
> alternative
> > solution?
> >
> > Thanks,
> > Dong
> >
> >
> > On Tue, Dec 5, 2017 at 5:39 PM, Boerge Svingen 
> wrote:
> >
> >>
> >> Yes. To provide a little more detail, we are using Kafka to store
> >> everything ever published by The New York Times, and to make this
> content
> >> available to a range of systems and applications. Assets are published
> to
> >> Kafka chronologically, so that consumers can seek to any point in time
> and
> >> start consuming from there, like Konstantin is describing, all the way
> back
> >> to our beginning in 1851.
> >>
> >> https://www.confluent.io/blog/publishing-apache-kafka-new-york-times/ <
> >> https://www.confluent.io/blog/publishing-apache-kafka-new-york-times/>
> >> has more information on the use case.
> >>
> >>
> >> Thanks,
> >> Boerge.
> >>
> >>
> >> --
> >>
> >> Boerge Svingen
> >> Director of Engineering
> >> The New York Times
> >>
> >>
> >>
> >>
> >>> On 2017-12-05, at 19:35, Dong Lin  wrote:
> >>>
> >>> Hey Konstantin,
> >>>
> >>> According to KIP-32 the timestamp is also used for log rolling and log
> >>> retention. Therefore, unless broker is configured to never delete any
> >>> message based on time, messages produced with negative timestamp in
> your
> >>> use-case will be deleted by the broker anyway. Do you actually plan to
> >> use
> >>> Kafka as a persistent storage system that never delete messages?
> >>>
> >>> Thanks,
> >>> Dong
> >>>
> >>>
> >>>
> >>>
> >>> On Tue, Dec 5, 2017 at 1:24 PM, Konstantin Chukhlomin <
> >> chuhlo...@gmail.com>
> >>> wrote:
> >>>
>  Hi Dong,
> 
>  Currently we are storing historical timestamp in the message.
> 
>  What we are trying to achieve is to make it possible to do Kafka
> lookup
>  by timestamp. Ideally I would do `offsetsForTimes` to find articles
>  published
>  in 1910s (if we are storing articles on the log).
> 
>  So first two suggestions aren't really covering our use-case.
> 
>  We could create a new timestamp type like "HistoricalTimestamp" or
>  "MaybeNegativeTimestamp".
>  And the only difference between this one and CreateTime is that it
> could
>  be negative.
>  I tend to use CreateTime for this purpose because it's easier to
>  understand from
>  user perspective as a timestamp which publisher can set.
> 
>  Thanks,
>  Konstantin
> 
> > On Dec 5, 2017, at 3:47 PM, Dong Lin  wrote:
> >
> > Hey Konstantin,
> >
> > Thanks for the KIP. I have a few questions below.
> >
> > Strictly speaking Kafka actually allows you to store historical data.
> >> And
> > user are free to encode arbitrary timestamp field in their Kafka
> >> message.
> > For example, your Kafka message can currently have Json or Avro
> format
>  and
> > you can put a timestamp field 

Re: 答复: [VOTE] KIP-223 - Add per-topic min lead and per-partition lead metrics to KafkaConsumer

2017-12-05 Thread Guozhang Wang
Hello Xi,

You can see that in o.a.k.common.metrics.Sensor, we allow constructors to
pass in one or more "parent" Sensors of the constructed Sensor, behind the
scene when a child sensor's metrics have been updated, the updates will be
propagated all the way up to its parents and ancestors (you can checkout
the source code for its impl). On different clients we have been using this
to build many hierarchical sensors, like per-dest-broker metrics v.s.
all-dest-broker metrics on selector, etc.

My understanding is that the cross-all-partitions "records-lead-min" will
be constructed as the parent of all the per-partition "records-lead-min",
is that true?


Guozhang

On Mon, Dec 4, 2017 at 11:26 PM, Hu Xi  wrote:

> Guozhang,
>
>
> Thanks for the vote and comments. I am not sure if I fully understand the
> parent metrics here. This KIP will introduce a client-level metric named
> 'records-lead-min' and three per-partition metrics tagged with
> topic Is it the child-parent relationship you mean?
>
>
> 
> 发件人: Guozhang Wang 
> 发送时间: 2017年12月5日 15:16
> 收件人: dev@kafka.apache.org
> 主题: Re: [VOTE] KIP-223 - Add per-topic min lead and per-partition lead
> metrics to KafkaConsumer
>
> Thanks Hu Xi,
>
> I made a pass over the KIP and it lgtm. +1.
>
> Just a clarification question: for the cross-partition "records-lead-min"
> metric, would that be implemented as a parent metric of the per-partition
> metrics?
>
>
> Guozhang
>
>
> On Mon, Dec 4, 2017 at 3:07 PM, Dong Lin  wrote:
>
> > +1 (non-binding)
> >
> > On Wed, Nov 29, 2017 at 7:05 PM, Hu Xi  wrote:
> >
> > > Hi all,
> > >
> > > As I didn't see any further discussion around this KIP, I'd like to
> start
> > > voting.
> > >
> > > KIP documentation:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 223+-+Add+per-topic+min+lead+and+per-partition+lead+
> > > metrics+to+KafkaConsumer
> > >
> > >
> > >
> > > Cheers,
> > >
> > > huxihx
> > >
> >
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang


Re: [DISCUSS]KIP-216: IQ should throw different exceptions for different errors

2017-12-05 Thread Matthias J. Sax
Thanks for the info. There is no hurry. Was just curious :)

-Matthias


On 12/5/17 7:34 PM, vito jeng wrote:
> Matthias,
> 
> Still in progress. I've been busy in recent weeks with my job.
> 
> But I believe I will update this KIP within few days.
> 
> 
> 
> 
> ---
> Vito
> 
> On Tue, Dec 5, 2017 at 8:05 AM, Matthias J. Sax 
> wrote:
> 
>> Vito,
>>
>> is there any update with regard to this KIP?
>>
>>
>> -Matthias
>>
>> On 11/5/17 6:11 PM, vito jeng wrote:
>>> Thanks, Guozhang and Matthias. Your comments very useful for me.
>>>
>>> I'll update KIP and keep going on.
>>>
>>>
>>>
>>> ---
>>> Vito
>>>
>>> On Sun, Nov 5, 2017 at 12:30 AM, Matthias J. Sax 
>>> wrote:
>>>
 Thanks for the KIP Vito!

 I agree with what Guozhang said. The original idea of the Jira was, to
 give different exceptions for different "recovery" strategies to the
>> user.

 For example, if a store is currently recreated, a user just need to wait
 and can query the store later. On the other hand, if a store go migrated
 to another instance, a user needs to rediscover the store instead of a
 "plain retry".

 Fatal errors might be a third category.

 Not sure if there is something else?

 Anyway, the KIP should contain a section that talks about this ideas and
 reasoning.


 -Matthias


 On 11/3/17 11:26 PM, Guozhang Wang wrote:
> Thanks for writing up the KIP.
>
> Vito, Matthias: one thing that I wanted to figure out first is what
> categories of errors we want to notify the users, if we only wants to
> distinguish fatal v.s. retriable then probably we should rename the
> proposed StateStoreMigratedException / StateStoreClosedException
>> classes.
> And then from there we should list what are the possible internal
> exceptions ever thrown in those APIs in the call trace, and which
> exceptions should be wrapped to what others, and which ones should be
> handled without re-throwing, and which ones should not be wrapped at
>> all
> but directly thrown to user's face.
>
> Guozhang
>
>
> On Wed, Nov 1, 2017 at 11:09 PM, vito jeng 
>> wrote:
>
>> Hi,
>>
>> I'd like to start discuss KIP-216:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 216%3A+IQ+should+throw+different+exceptions+for+different+errors
>>
>> Please have a look.
>> Thanks!
>>
>> ---
>> Vito
>>
>
>
>


>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-05 Thread Becket Qin
Hi Jun,

That is true, but in reality it seems rare that the fetch size is smaller
than index interval. In the worst case, we may need to do another look up.
In the future, when we have the mechanism to inform the clients about the
broker configurations, the clients may want to configure correspondingly as
well, e.g. max message size, max timestamp difference, etc.

On the other hand, we are not guaranteeing that the returned bytes in a
partition is always bounded by the per partition fetch size, because we are
going to return at least one message, so the per partition fetch size seems
already a soft limit. Since we are introducing a new fetch protocol and
this is related, it might be worth considering this option.

BTW, one reason I bring this up again was because yesterday we had a
presentation from Uber regarding the end to end latency. And they are
seeing this binary search behavior impacting the latency due to page in/out
of the index file.

Thanks,

Jiangjie (Becket) Qin



On Tue, Dec 5, 2017 at 5:55 PM, Jun Rao  wrote:

> Hi, Jiangjie,
>
> Not sure returning the fetch response at the index boundary is a general
> solution. The index interval is configurable. If one configures the index
> interval larger than the per partition fetch size, we probably have to
> return data not at the index boundary.
>
> Thanks,
>
> Jun
>
> On Tue, Dec 5, 2017 at 4:17 PM, Becket Qin  wrote:
>
> > Hi Colin,
> >
> > Thinking about this again. I do see the reason that we want to have a
> epoch
> > to avoid out of order registration of the interested set. But I am
> > wondering if the following semantic would meet what we want better:
> >  - Session Id: the id assigned to a single client for life long time. i.e
> > it does not change when the interested partitions change.
> >  - Epoch: the interested set epoch. Only updated when a full fetch
> request
> > comes, which may result in the interested partition set change.
> > This will ensure that the registered interested set will always be the
> > latest registration. And the clients can change the interested partition
> > set without creating another session.
> >
> > Also I want to bring up the way the leader respond to the FetchRequest
> > again. I think it would be a big improvement if we just return the
> > responses at index entry boundary or log end. There are a few benefits:
> > 1. The leader does not need the follower to provide the offsets,
> > 2. The fetch requests no longer need to do a binary search on the index,
> it
> > just need to do a linear access to the index file, which is much cache
> > friendly.
> >
> > Assuming the leader can get the last returned offsets to the clients
> > cheaply, I am still not sure why it is necessary for the followers to
> > repeat the offsets in the incremental fetch every time. Intuitively it
> > should only update the offsets when the leader has wrong offsets, in most
> > cases, the incremental fetch request should just be empty. Otherwise we
> may
> > not be saving much when there are continuous small requests going to each
> > partition, which could be normal for some low latency systems.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> > On Tue, Dec 5, 2017 at 2:14 PM, Colin McCabe  wrote:
> >
> > > On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > > > Hi Colin
> > > >
> > > > Addressing the topic of how to manage slots from the other thread.
> > > > With tcp connections all this comes for free essentially.
> > >
> > > Hi Jan,
> > >
> > > I don't think that it's accurate to say that cache management "comes
> for
> > > free" by coupling the incremental fetch session with the TCP session.
> > > When a new TCP session is started by a fetch request, you still have to
> > > decide whether to grant that request an incremental fetch session or
> > > not.  If your answer is that you always grant the request, I would
> argue
> > > that you do not have cache management.
> > >
> > > I guess you could argue that timeouts are cache management, but I don't
> > > find that argument persuasive.  Anyone could just create a lot of TCP
> > > sessions and use a lot of resources, in that case.  So there is
> > > essentially no limit on memory use.  In any case, TCP sessions don't
> > > help us implement fetch session timeouts.
> > >
> > > > I still would argue we disable it by default and make a flag in the
> > > > broker to ask the leader to maintain the cache while replicating and
> > > also only
> > > > have it optional in consumers (default to off) so one can turn it on
> > > > where it really hurts.  MirrorMaker and audit consumers prominently.
> > >
> > > I agree with Jason's point from earlier in the thread.  Adding extra
> > > configuration knobs that aren't really necessary can harm usability.
> > > Certainly asking people to manually turn on a feature "where it really
> > > hurts" seems to fall in that category, when we could easily enable it
> > > 

Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Boerge Svingen

Thank you for the suggestion. We considered this before. It works, but it’s a 
hack, and we would be providing a bad user experience for our consumers if we 
had to explain, “if you want to start consuming in 2014, you have to pretend to 
want 2214”.

We would rather solve the underlying problem. These are perfectly valid 
timestamps, and I can’t see any reason why Kafka shouldn’t support them - I 
don’t think using `Long.MIN_VALUE` instead of -1 would necessarily add 
complexity here?


Thanks,
Boerge.



> On 2017-12-05, at 21:36, Dong Lin  wrote:
> 
> Hey Boerge,
> 
> Thanks for the blog link. I will read this blog later.
> 
> Here is another alternative solution which may be worth thinking. We know
> that the Unix time 0 corresponds to January 1, 1970. Let's say the earliest
> time you may want to use as the timestamp of the Kafka message is within X
> milliseconds before the January 1, 1970. Then you can add X to the
> timestamp before you produce Kafka message. And you can also make similar
> conversion when you use `offsetsForTimes()` or after you consume messages.
> This seems to address your use-case without introducing negative timestamp.
> 
> IMO, this solution requires a bit more logic in your application code. But
> it keeps the Kafka timestamp logic simple and we reserve the capability to
> use timestamp -1 for messages without timestamp for most Kafka users who do
> not need negative timestamp. Do you think this would be a good alternative
> solution?
> 
> Thanks,
> Dong
> 
> 
> On Tue, Dec 5, 2017 at 5:39 PM, Boerge Svingen  wrote:
> 
>> 
>> Yes. To provide a little more detail, we are using Kafka to store
>> everything ever published by The New York Times, and to make this content
>> available to a range of systems and applications. Assets are published to
>> Kafka chronologically, so that consumers can seek to any point in time and
>> start consuming from there, like Konstantin is describing, all the way back
>> to our beginning in 1851.
>> 
>> https://www.confluent.io/blog/publishing-apache-kafka-new-york-times/ <
>> https://www.confluent.io/blog/publishing-apache-kafka-new-york-times/>
>> has more information on the use case.
>> 
>> 
>> Thanks,
>> Boerge.
>> 
>> 
>> --
>> 
>> Boerge Svingen
>> Director of Engineering
>> The New York Times
>> 
>> 
>> 
>> 
>>> On 2017-12-05, at 19:35, Dong Lin  wrote:
>>> 
>>> Hey Konstantin,
>>> 
>>> According to KIP-32 the timestamp is also used for log rolling and log
>>> retention. Therefore, unless broker is configured to never delete any
>>> message based on time, messages produced with negative timestamp in your
>>> use-case will be deleted by the broker anyway. Do you actually plan to
>> use
>>> Kafka as a persistent storage system that never delete messages?
>>> 
>>> Thanks,
>>> Dong
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Dec 5, 2017 at 1:24 PM, Konstantin Chukhlomin <
>> chuhlo...@gmail.com>
>>> wrote:
>>> 
 Hi Dong,
 
 Currently we are storing historical timestamp in the message.
 
 What we are trying to achieve is to make it possible to do Kafka lookup
 by timestamp. Ideally I would do `offsetsForTimes` to find articles
 published
 in 1910s (if we are storing articles on the log).
 
 So first two suggestions aren't really covering our use-case.
 
 We could create a new timestamp type like "HistoricalTimestamp" or
 "MaybeNegativeTimestamp".
 And the only difference between this one and CreateTime is that it could
 be negative.
 I tend to use CreateTime for this purpose because it's easier to
 understand from
 user perspective as a timestamp which publisher can set.
 
 Thanks,
 Konstantin
 
> On Dec 5, 2017, at 3:47 PM, Dong Lin  wrote:
> 
> Hey Konstantin,
> 
> Thanks for the KIP. I have a few questions below.
> 
> Strictly speaking Kafka actually allows you to store historical data.
>> And
> user are free to encode arbitrary timestamp field in their Kafka
>> message.
> For example, your Kafka message can currently have Json or Avro format
 and
> you can put a timestamp field there. Do you think that could address
>> your
> use-case?
> 
> Alternatively, KIP-82 introduced Record Header in Kafka and you can
>> also
> define your customized key/value pair in the header. Do you think this
 can
> address your use-case?
> 
> Also, currently there are two types of timestamp according to KIP-32.
>> If
> the type is LogAppendTime then the timestamp value is the time when
 broker
> receives the message. If the type is CreateTime then the timestamp
>> value
 is
> determined when producer produces message. With these two definitions,
 the
> timestamp should always be positive. We probably need a new type here
>> if
 we
> can not put timestamp in the Record Header or the message 

Re: Comparing Pulsar and Kafka: unified queuing and streaming

2017-12-05 Thread Khurrum Nasim
Jason,

Comments inline.

On Tue, Dec 5, 2017 at 10:59 AM, Jason Gustafson  wrote:

> > I believe a lot of users are using the kafka high level consumers, it is
> > effectively an **unordered** messaging/streaming pattern. People using
> high
> > level consumers don't actually need any ordering guarantees. In this
> sense,
> > a *shared* subscription in Apache Pulsar seems to be better than current
> > Kafka's consumer group model, as it allows the consumption rate not
> limited
> > by the number of partitions, can actually grow beyond the number of
> > partitions. We do see a lot of operational pain points on production
> coming
> > from consumer lags, which I think it is very commonly seen during
> partition
> > rebalancing in a consumer group. Selective acking seems to provide a
> finer
> > granularity on acknowledgment, which can be actually good for avoiding
> > consumer lags and avoid reprocessing messages during partition rebalance.
>
>
> Yeah, I'm not sure about this. I'd be interested to understand the design
> of this feature a little better. In practice, when ordering is unimportant,
> adding partitions seems not too big of a deal.


I think it depends. You probably can address the problem by adding more
partitions, if the topic is only used by one consume group exclusive.

However it still have pain points as follows:

- in a shared organization, a topic might be shared between multiple teams.
sometimes it is really hard or not simple to increase partitions for a
topic. especially if some team wants to consume messages in order.
- even say you can easily increase the number of partitions, but it doesn't
address the consumer lag issue. because without selective acking, some of
the *acknowledged* or *processed* messages will be  redelivered again after
partitions are bounced to other consumers.



> Also, I'm aware of active
> efforts to make rebalancing less of a pain point for our users ;)
>

Can you point me the KIPs of these efforts? would love to keep an eye on
them.


>
> The last question, from users perspective, since both kafka and pulsar are
> > distributed pub/sub messaging systems and both of them at the ASF, is
> there
> > any possibility for these two projects to collaborate, e.g. kafka adopts
> > pulsar's messaging model, pulsar can use kafka streams and kafka
> connect. I
> > believe a lot of people in the mailing list might have same or similar
> > question. From end-user perspective, if such collaboration can happen,
> that
> > is going to great for users and also the ASF. I would like to hear any
> > thoughts from kafka committers and pmc members.
>
>
> I see this a little differently. Although there is some overlap between the
> projects, they have quite different underlying philosophies (as Marina
> alluded to) and I hope this will take them on different trajectories over
> time. That would ultimately benefit users more than having two competing
> projects solving all the same use cases. We don't need to try to cram
> Pulsar features into Kafka if it's not a good fit and vice versa. At the
> same time, where capabilities do overlap, we can try to learn from their
> experience and they can learn from ours. The example of message retention
> seemed like one of these instances since there are legitimate use cases and
> Pulsar's approach has some benefits.
>

sure. make sense to me.

btw, have you guys taken a look at pulsar's kafka API? I am wondering how
do you guys think about this.

- KN


>
>
> -Jason
>
>
>
> On Tue, Dec 5, 2017 at 9:57 AM, Khurrum Nasim 
> wrote:
>
> > Hi Marina,
> >
> >
> > On Tue, Dec 5, 2017 at 6:58 AM, Marina Popova 
> > wrote:
> >
> > > Hi,
> > > I don't think it would be such a great idea to start modifying the very
> > > foundation of Kafka's design to accommodate more and more extra use
> > cases.
> > > Kafka because so widely adopted and popular because its creator made a
> > > brilliant decision to make it "dumb broker - smart consumer" type of
> the
> > > system, where there is no to minimal dependencies between Kafka brokers
> > and
> > > Consumers. This is what make Kafka blazingly fast and truly scalable -
> > able
> > > to handle thousands of Consumers with no impact on performance.
> > >
> >
> > I am not sure I agree with this. I think from end-user perspective, what
> > users expect is a ultra simple streaming/messaging system: applications
> > sends messages, messaging systems store and dispatch them, consumers
> > consume the messages and tell the systems that they already consumed the
> > messages. IMO whether a centralized management or decentralize management
> > doesn't really matter here if kafka is able to do things without
> impacting
> > performance.
> >
> > sometimes people assume that smarter brokers (like traditional messaging
> > brokers) can not offer high throughput and scalability, because they do
> > "too many things". but I took a look at Pulsar 

Re: [DISCUSS]KIP-216: IQ should throw different exceptions for different errors

2017-12-05 Thread vito jeng
Matthias,

Still in progress. I've been busy in recent weeks with my job.

But I believe I will update this KIP within few days.




---
Vito

On Tue, Dec 5, 2017 at 8:05 AM, Matthias J. Sax 
wrote:

> Vito,
>
> is there any update with regard to this KIP?
>
>
> -Matthias
>
> On 11/5/17 6:11 PM, vito jeng wrote:
> > Thanks, Guozhang and Matthias. Your comments very useful for me.
> >
> > I'll update KIP and keep going on.
> >
> >
> >
> > ---
> > Vito
> >
> > On Sun, Nov 5, 2017 at 12:30 AM, Matthias J. Sax 
> > wrote:
> >
> >> Thanks for the KIP Vito!
> >>
> >> I agree with what Guozhang said. The original idea of the Jira was, to
> >> give different exceptions for different "recovery" strategies to the
> user.
> >>
> >> For example, if a store is currently recreated, a user just need to wait
> >> and can query the store later. On the other hand, if a store go migrated
> >> to another instance, a user needs to rediscover the store instead of a
> >> "plain retry".
> >>
> >> Fatal errors might be a third category.
> >>
> >> Not sure if there is something else?
> >>
> >> Anyway, the KIP should contain a section that talks about this ideas and
> >> reasoning.
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 11/3/17 11:26 PM, Guozhang Wang wrote:
> >>> Thanks for writing up the KIP.
> >>>
> >>> Vito, Matthias: one thing that I wanted to figure out first is what
> >>> categories of errors we want to notify the users, if we only wants to
> >>> distinguish fatal v.s. retriable then probably we should rename the
> >>> proposed StateStoreMigratedException / StateStoreClosedException
> classes.
> >>> And then from there we should list what are the possible internal
> >>> exceptions ever thrown in those APIs in the call trace, and which
> >>> exceptions should be wrapped to what others, and which ones should be
> >>> handled without re-throwing, and which ones should not be wrapped at
> all
> >>> but directly thrown to user's face.
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Wed, Nov 1, 2017 at 11:09 PM, vito jeng 
> wrote:
> >>>
>  Hi,
> 
>  I'd like to start discuss KIP-216:
> 
>  https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>  216%3A+IQ+should+throw+different+exceptions+for+different+errors
> 
>  Please have a look.
>  Thanks!
> 
>  ---
>  Vito
> 
> >>>
> >>>
> >>>
> >>
> >>
> >
>
>


Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Ted Yu
bq. you can add X to the timestamp before you produce Kafka message

This assumes the earliest timestamp (for user application) is known
beforehand.

However, what if this earliest timestamp shifts even earlier (e.g. due to
some discovery) ?

Cheers

On Tue, Dec 5, 2017 at 6:36 PM, Dong Lin  wrote:

> Hey Boerge,
>
> Thanks for the blog link. I will read this blog later.
>
> Here is another alternative solution which may be worth thinking. We know
> that the Unix time 0 corresponds to January 1, 1970. Let's say the earliest
> time you may want to use as the timestamp of the Kafka message is within X
> milliseconds before the January 1, 1970. Then you can add X to the
> timestamp before you produce Kafka message. And you can also make similar
> conversion when you use `offsetsForTimes()` or after you consume messages.
> This seems to address your use-case without introducing negative timestamp.
>
> IMO, this solution requires a bit more logic in your application code. But
> it keeps the Kafka timestamp logic simple and we reserve the capability to
> use timestamp -1 for messages without timestamp for most Kafka users who do
> not need negative timestamp. Do you think this would be a good alternative
> solution?
>
> Thanks,
> Dong
>
>
> On Tue, Dec 5, 2017 at 5:39 PM, Boerge Svingen 
> wrote:
>
> >
> > Yes. To provide a little more detail, we are using Kafka to store
> > everything ever published by The New York Times, and to make this content
> > available to a range of systems and applications. Assets are published to
> > Kafka chronologically, so that consumers can seek to any point in time
> and
> > start consuming from there, like Konstantin is describing, all the way
> back
> > to our beginning in 1851.
> >
> > https://www.confluent.io/blog/publishing-apache-kafka-new-york-times/ <
> > https://www.confluent.io/blog/publishing-apache-kafka-new-york-times/>
> > has more information on the use case.
> >
> >
> > Thanks,
> > Boerge.
> >
> >
> > --
> >
> > Boerge Svingen
> > Director of Engineering
> > The New York Times
> >
> >
> >
> >
> > > On 2017-12-05, at 19:35, Dong Lin  wrote:
> > >
> > > Hey Konstantin,
> > >
> > > According to KIP-32 the timestamp is also used for log rolling and log
> > > retention. Therefore, unless broker is configured to never delete any
> > > message based on time, messages produced with negative timestamp in
> your
> > > use-case will be deleted by the broker anyway. Do you actually plan to
> > use
> > > Kafka as a persistent storage system that never delete messages?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > >
> > > On Tue, Dec 5, 2017 at 1:24 PM, Konstantin Chukhlomin <
> > chuhlo...@gmail.com>
> > > wrote:
> > >
> > >> Hi Dong,
> > >>
> > >> Currently we are storing historical timestamp in the message.
> > >>
> > >> What we are trying to achieve is to make it possible to do Kafka
> lookup
> > >> by timestamp. Ideally I would do `offsetsForTimes` to find articles
> > >> published
> > >> in 1910s (if we are storing articles on the log).
> > >>
> > >> So first two suggestions aren't really covering our use-case.
> > >>
> > >> We could create a new timestamp type like "HistoricalTimestamp" or
> > >> "MaybeNegativeTimestamp".
> > >> And the only difference between this one and CreateTime is that it
> could
> > >> be negative.
> > >> I tend to use CreateTime for this purpose because it's easier to
> > >> understand from
> > >> user perspective as a timestamp which publisher can set.
> > >>
> > >> Thanks,
> > >> Konstantin
> > >>
> > >>> On Dec 5, 2017, at 3:47 PM, Dong Lin  wrote:
> > >>>
> > >>> Hey Konstantin,
> > >>>
> > >>> Thanks for the KIP. I have a few questions below.
> > >>>
> > >>> Strictly speaking Kafka actually allows you to store historical data.
> > And
> > >>> user are free to encode arbitrary timestamp field in their Kafka
> > message.
> > >>> For example, your Kafka message can currently have Json or Avro
> format
> > >> and
> > >>> you can put a timestamp field there. Do you think that could address
> > your
> > >>> use-case?
> > >>>
> > >>> Alternatively, KIP-82 introduced Record Header in Kafka and you can
> > also
> > >>> define your customized key/value pair in the header. Do you think
> this
> > >> can
> > >>> address your use-case?
> > >>>
> > >>> Also, currently there are two types of timestamp according to KIP-32.
> > If
> > >>> the type is LogAppendTime then the timestamp value is the time when
> > >> broker
> > >>> receives the message. If the type is CreateTime then the timestamp
> > value
> > >> is
> > >>> determined when producer produces message. With these two
> definitions,
> > >> the
> > >>> timestamp should always be positive. We probably need a new type here
> > if
> > >> we
> > >>> can not put timestamp in the Record Header or the message payload.
> Does
> > >>> this sound reasonable?
> > >>>
> > >>> Thanks,
> > >>> Dong
> > >>>
> > >>>
> > >>>

[jira] [Created] (KAFKA-6313) Kafka Core maven dependencies are missing SLF4J API

2017-12-05 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-6313:


 Summary: Kafka Core maven dependencies are missing SLF4J API
 Key: KAFKA-6313
 URL: https://issues.apache.org/jira/browse/KAFKA-6313
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.1.0
Reporter: Randall Hauch
Priority: Critical
 Fix For: 1.1.0


In an application that depends on the Kafka server artifacts with:

{code:xml}

org.apache.kafka
kafka_2.11
1.1.0-SNAPSHOT

{code}

and then running the server programmatically, the following error occurs:

{noformat}
[2017-11-23 01:01:45,029] INFO Shutting down producer 
(kafka.producer.Producer:63)
[2017-11-23 01:01:45,051] INFO Closing all sync producers 
(kafka.producer.ProducerPool:63)
[2017-11-23 01:01:45,052] INFO Producer shutdown completed in 23 ms 
(kafka.producer.Producer:63)
[2017-11-23 01:01:45,052] INFO [KafkaServer id=1] shutting down 
(kafka.server.KafkaServer:63)
[2017-11-23 01:01:45,057] ERROR [KafkaServer id=1] Fatal error during 
KafkaServer shutdown. (kafka.server.KafkaServer:161)
java.lang.NoClassDefFoundError: org/slf4j/event/Level
at kafka.utils.CoreUtils$.swallow$default$3(CoreUtils.scala:83)
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:520)
   ...
Caused by: java.lang.ClassNotFoundException: org.slf4j.event.Level
at java.net.URLClassLoader$1.run(URLClassLoader.java:359)
at java.net.URLClassLoader$1.run(URLClassLoader.java:348)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:347)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:312)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 25 more
{noformat}

It appears that KAFKA-1044 and [this 
PR|https://github.com/apache/kafka/pull/3477] removed the use of Log4J from 
Core but [added use 
of|https://github.com/confluentinc/kafka/commit/ed8b0315a6c3705b2a163ce3ab4723234779264f#diff-52505b9374ea885e44bcb73cbc4714d6R34]
 the {{org.slf4j.event.Level}} in {{CoreUtils.scala}}. The 
{{org.slf4j.event.Level}} class is in the {{org.slf4j:slf4j-api}} artifact, 
which is currently not included in the dependencies of 
{{org.apache.kafka:kafka_2.11:1.1.0-SNAPSHOT}}. Because this is needed by the 
server, the SLF4J API library probably needs to be added to the dependencies.

[~viktorsomogyi] and [~ijuma], was this intentional, or is it intended that the 
SLF4J API be marked as {{provided}}? BTW, I marked this as CRITICAL just 
because this probably needs to be sorted out before the release.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Dong Lin
Hey Boerge,

Thanks for the blog link. I will read this blog later.

Here is another alternative solution which may be worth thinking. We know
that the Unix time 0 corresponds to January 1, 1970. Let's say the earliest
time you may want to use as the timestamp of the Kafka message is within X
milliseconds before the January 1, 1970. Then you can add X to the
timestamp before you produce Kafka message. And you can also make similar
conversion when you use `offsetsForTimes()` or after you consume messages.
This seems to address your use-case without introducing negative timestamp.

IMO, this solution requires a bit more logic in your application code. But
it keeps the Kafka timestamp logic simple and we reserve the capability to
use timestamp -1 for messages without timestamp for most Kafka users who do
not need negative timestamp. Do you think this would be a good alternative
solution?

Thanks,
Dong


On Tue, Dec 5, 2017 at 5:39 PM, Boerge Svingen  wrote:

>
> Yes. To provide a little more detail, we are using Kafka to store
> everything ever published by The New York Times, and to make this content
> available to a range of systems and applications. Assets are published to
> Kafka chronologically, so that consumers can seek to any point in time and
> start consuming from there, like Konstantin is describing, all the way back
> to our beginning in 1851.
>
> https://www.confluent.io/blog/publishing-apache-kafka-new-york-times/ <
> https://www.confluent.io/blog/publishing-apache-kafka-new-york-times/>
> has more information on the use case.
>
>
> Thanks,
> Boerge.
>
>
> --
>
> Boerge Svingen
> Director of Engineering
> The New York Times
>
>
>
>
> > On 2017-12-05, at 19:35, Dong Lin  wrote:
> >
> > Hey Konstantin,
> >
> > According to KIP-32 the timestamp is also used for log rolling and log
> > retention. Therefore, unless broker is configured to never delete any
> > message based on time, messages produced with negative timestamp in your
> > use-case will be deleted by the broker anyway. Do you actually plan to
> use
> > Kafka as a persistent storage system that never delete messages?
> >
> > Thanks,
> > Dong
> >
> >
> >
> >
> > On Tue, Dec 5, 2017 at 1:24 PM, Konstantin Chukhlomin <
> chuhlo...@gmail.com>
> > wrote:
> >
> >> Hi Dong,
> >>
> >> Currently we are storing historical timestamp in the message.
> >>
> >> What we are trying to achieve is to make it possible to do Kafka lookup
> >> by timestamp. Ideally I would do `offsetsForTimes` to find articles
> >> published
> >> in 1910s (if we are storing articles on the log).
> >>
> >> So first two suggestions aren't really covering our use-case.
> >>
> >> We could create a new timestamp type like "HistoricalTimestamp" or
> >> "MaybeNegativeTimestamp".
> >> And the only difference between this one and CreateTime is that it could
> >> be negative.
> >> I tend to use CreateTime for this purpose because it's easier to
> >> understand from
> >> user perspective as a timestamp which publisher can set.
> >>
> >> Thanks,
> >> Konstantin
> >>
> >>> On Dec 5, 2017, at 3:47 PM, Dong Lin  wrote:
> >>>
> >>> Hey Konstantin,
> >>>
> >>> Thanks for the KIP. I have a few questions below.
> >>>
> >>> Strictly speaking Kafka actually allows you to store historical data.
> And
> >>> user are free to encode arbitrary timestamp field in their Kafka
> message.
> >>> For example, your Kafka message can currently have Json or Avro format
> >> and
> >>> you can put a timestamp field there. Do you think that could address
> your
> >>> use-case?
> >>>
> >>> Alternatively, KIP-82 introduced Record Header in Kafka and you can
> also
> >>> define your customized key/value pair in the header. Do you think this
> >> can
> >>> address your use-case?
> >>>
> >>> Also, currently there are two types of timestamp according to KIP-32.
> If
> >>> the type is LogAppendTime then the timestamp value is the time when
> >> broker
> >>> receives the message. If the type is CreateTime then the timestamp
> value
> >> is
> >>> determined when producer produces message. With these two definitions,
> >> the
> >>> timestamp should always be positive. We probably need a new type here
> if
> >> we
> >>> can not put timestamp in the Record Header or the message payload. Does
> >>> this sound reasonable?
> >>>
> >>> Thanks,
> >>> Dong
> >>>
> >>>
> >>>
> >>> On Tue, Dec 5, 2017 at 8:40 AM, Konstantin Chukhlomin <
> >> chuhlo...@gmail.com>
> >>> wrote:
> >>>
>  Hi all,
> 
>  I have created a KIP to support negative timestamp:
>  https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>  228+Negative+record+timestamp+support   confluence/display/KAFKA/KIP-228+Negative+record+timestamp+support>
> 
>  Here are proposed changes: https://github.com/apache/
>  kafka/compare/trunk...chuhlomin:trunk   kafka/compare/trunk...chuhlomin:trunk>
> 
>  I'm 

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-05 Thread Jun Rao
Hi, Jiangjie,

Not sure returning the fetch response at the index boundary is a general
solution. The index interval is configurable. If one configures the index
interval larger than the per partition fetch size, we probably have to
return data not at the index boundary.

Thanks,

Jun

On Tue, Dec 5, 2017 at 4:17 PM, Becket Qin  wrote:

> Hi Colin,
>
> Thinking about this again. I do see the reason that we want to have a epoch
> to avoid out of order registration of the interested set. But I am
> wondering if the following semantic would meet what we want better:
>  - Session Id: the id assigned to a single client for life long time. i.e
> it does not change when the interested partitions change.
>  - Epoch: the interested set epoch. Only updated when a full fetch request
> comes, which may result in the interested partition set change.
> This will ensure that the registered interested set will always be the
> latest registration. And the clients can change the interested partition
> set without creating another session.
>
> Also I want to bring up the way the leader respond to the FetchRequest
> again. I think it would be a big improvement if we just return the
> responses at index entry boundary or log end. There are a few benefits:
> 1. The leader does not need the follower to provide the offsets,
> 2. The fetch requests no longer need to do a binary search on the index, it
> just need to do a linear access to the index file, which is much cache
> friendly.
>
> Assuming the leader can get the last returned offsets to the clients
> cheaply, I am still not sure why it is necessary for the followers to
> repeat the offsets in the incremental fetch every time. Intuitively it
> should only update the offsets when the leader has wrong offsets, in most
> cases, the incremental fetch request should just be empty. Otherwise we may
> not be saving much when there are continuous small requests going to each
> partition, which could be normal for some low latency systems.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Tue, Dec 5, 2017 at 2:14 PM, Colin McCabe  wrote:
>
> > On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > > Hi Colin
> > >
> > > Addressing the topic of how to manage slots from the other thread.
> > > With tcp connections all this comes for free essentially.
> >
> > Hi Jan,
> >
> > I don't think that it's accurate to say that cache management "comes for
> > free" by coupling the incremental fetch session with the TCP session.
> > When a new TCP session is started by a fetch request, you still have to
> > decide whether to grant that request an incremental fetch session or
> > not.  If your answer is that you always grant the request, I would argue
> > that you do not have cache management.
> >
> > I guess you could argue that timeouts are cache management, but I don't
> > find that argument persuasive.  Anyone could just create a lot of TCP
> > sessions and use a lot of resources, in that case.  So there is
> > essentially no limit on memory use.  In any case, TCP sessions don't
> > help us implement fetch session timeouts.
> >
> > > I still would argue we disable it by default and make a flag in the
> > > broker to ask the leader to maintain the cache while replicating and
> > also only
> > > have it optional in consumers (default to off) so one can turn it on
> > > where it really hurts.  MirrorMaker and audit consumers prominently.
> >
> > I agree with Jason's point from earlier in the thread.  Adding extra
> > configuration knobs that aren't really necessary can harm usability.
> > Certainly asking people to manually turn on a feature "where it really
> > hurts" seems to fall in that category, when we could easily enable it
> > automatically for them.
> >
> > >
> > > Otherwise I left a few remarks in-line, which should help to understand
> > > my view of the situation better
> > >
> > > Best Jan
> > >
> > >
> > > On 05.12.2017 08:06, Colin McCabe wrote:
> > > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > > >>
> > > >> On 03.12.2017 21:55, Colin McCabe wrote:
> > > >>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > >  Thanks for the explanation, Colin. A few more questions.
> > > 
> > > > The session epoch is not complex.  It's just a number which
> > increments
> > > > on each incremental fetch.  The session epoch is also useful for
> > > > debugging-- it allows you to match up requests and responses when
> > > > looking at log files.
> > >  Currently each request in Kafka has a correlation id to help match
> > the
> > >  requests and responses. Is epoch doing something differently?
> > > >>> Hi Becket,
> > > >>>
> > > >>> The correlation ID is used within a single TCP session, to uniquely
> > > >>> associate a request with a response.  The correlation ID is not
> > unique
> > > >>> (and has no meaning) outside the context of that single TCP
> session.
> > > >>>
> > > >>> Keep in mind, 

Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Boerge Svingen

Yes. To provide a little more detail, we are using Kafka to store everything 
ever published by The New York Times, and to make this content available to a 
range of systems and applications. Assets are published to Kafka 
chronologically, so that consumers can seek to any point in time and start 
consuming from there, like Konstantin is describing, all the way back to our 
beginning in 1851.

https://www.confluent.io/blog/publishing-apache-kafka-new-york-times/ 
 has 
more information on the use case.


Thanks,
Boerge.


-- 

Boerge Svingen
Director of Engineering
The New York Times




> On 2017-12-05, at 19:35, Dong Lin  wrote:
> 
> Hey Konstantin,
> 
> According to KIP-32 the timestamp is also used for log rolling and log
> retention. Therefore, unless broker is configured to never delete any
> message based on time, messages produced with negative timestamp in your
> use-case will be deleted by the broker anyway. Do you actually plan to use
> Kafka as a persistent storage system that never delete messages?
> 
> Thanks,
> Dong
> 
> 
> 
> 
> On Tue, Dec 5, 2017 at 1:24 PM, Konstantin Chukhlomin 
> wrote:
> 
>> Hi Dong,
>> 
>> Currently we are storing historical timestamp in the message.
>> 
>> What we are trying to achieve is to make it possible to do Kafka lookup
>> by timestamp. Ideally I would do `offsetsForTimes` to find articles
>> published
>> in 1910s (if we are storing articles on the log).
>> 
>> So first two suggestions aren't really covering our use-case.
>> 
>> We could create a new timestamp type like "HistoricalTimestamp" or
>> "MaybeNegativeTimestamp".
>> And the only difference between this one and CreateTime is that it could
>> be negative.
>> I tend to use CreateTime for this purpose because it's easier to
>> understand from
>> user perspective as a timestamp which publisher can set.
>> 
>> Thanks,
>> Konstantin
>> 
>>> On Dec 5, 2017, at 3:47 PM, Dong Lin  wrote:
>>> 
>>> Hey Konstantin,
>>> 
>>> Thanks for the KIP. I have a few questions below.
>>> 
>>> Strictly speaking Kafka actually allows you to store historical data. And
>>> user are free to encode arbitrary timestamp field in their Kafka message.
>>> For example, your Kafka message can currently have Json or Avro format
>> and
>>> you can put a timestamp field there. Do you think that could address your
>>> use-case?
>>> 
>>> Alternatively, KIP-82 introduced Record Header in Kafka and you can also
>>> define your customized key/value pair in the header. Do you think this
>> can
>>> address your use-case?
>>> 
>>> Also, currently there are two types of timestamp according to KIP-32. If
>>> the type is LogAppendTime then the timestamp value is the time when
>> broker
>>> receives the message. If the type is CreateTime then the timestamp value
>> is
>>> determined when producer produces message. With these two definitions,
>> the
>>> timestamp should always be positive. We probably need a new type here if
>> we
>>> can not put timestamp in the Record Header or the message payload. Does
>>> this sound reasonable?
>>> 
>>> Thanks,
>>> Dong
>>> 
>>> 
>>> 
>>> On Tue, Dec 5, 2017 at 8:40 AM, Konstantin Chukhlomin <
>> chuhlo...@gmail.com>
>>> wrote:
>>> 
 Hi all,
 
 I have created a KIP to support negative timestamp:
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-
 228+Negative+record+timestamp+support 
 
 Here are proposed changes: https://github.com/apache/
 kafka/compare/trunk...chuhlomin:trunk 
 
 I'm pretty sure that not cases are covered, so comments and suggestions
 are welcome.
 
 Thank you,
 Konstantin
>> 
>> 



[jira] [Reopened] (KAFKA-5878) Add sensor for queue size of the controller-event-thread

2017-12-05 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reopened KAFKA-5878:
-

> Add sensor for queue size of the controller-event-thread
> 
>
> Key: KAFKA-5878
> URL: https://issues.apache.org/jira/browse/KAFKA-5878
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Ted Yu
What if the negative timestamp is stored this way ?

Long.MIN_VALUE + delta (where delta is positvie)

and calculated this way when used:

1/1/1970 - delta

This approach avoids the ambiguity of -1 timestamp since -1 would be stored
as Long.MIN_VALUE+1

Log retention can handle such format with minor modification.

Just my two cents.

On Tue, Dec 5, 2017 at 4:35 PM, Dong Lin  wrote:

> Hey Konstantin,
>
> According to KIP-32 the timestamp is also used for log rolling and log
> retention. Therefore, unless broker is configured to never delete any
> message based on time, messages produced with negative timestamp in your
> use-case will be deleted by the broker anyway. Do you actually plan to use
> Kafka as a persistent storage system that never delete messages?
>
> Thanks,
> Dong
>
>
>
>
> On Tue, Dec 5, 2017 at 1:24 PM, Konstantin Chukhlomin  >
> wrote:
>
> > Hi Dong,
> >
> > Currently we are storing historical timestamp in the message.
> >
> > What we are trying to achieve is to make it possible to do Kafka lookup
> > by timestamp. Ideally I would do `offsetsForTimes` to find articles
> > published
> > in 1910s (if we are storing articles on the log).
> >
> > So first two suggestions aren't really covering our use-case.
> >
> > We could create a new timestamp type like "HistoricalTimestamp" or
> > "MaybeNegativeTimestamp".
> > And the only difference between this one and CreateTime is that it could
> > be negative.
> > I tend to use CreateTime for this purpose because it's easier to
> > understand from
> > user perspective as a timestamp which publisher can set.
> >
> > Thanks,
> > Konstantin
> >
> > > On Dec 5, 2017, at 3:47 PM, Dong Lin  wrote:
> > >
> > > Hey Konstantin,
> > >
> > > Thanks for the KIP. I have a few questions below.
> > >
> > > Strictly speaking Kafka actually allows you to store historical data.
> And
> > > user are free to encode arbitrary timestamp field in their Kafka
> message.
> > > For example, your Kafka message can currently have Json or Avro format
> > and
> > > you can put a timestamp field there. Do you think that could address
> your
> > > use-case?
> > >
> > > Alternatively, KIP-82 introduced Record Header in Kafka and you can
> also
> > > define your customized key/value pair in the header. Do you think this
> > can
> > > address your use-case?
> > >
> > > Also, currently there are two types of timestamp according to KIP-32.
> If
> > > the type is LogAppendTime then the timestamp value is the time when
> > broker
> > > receives the message. If the type is CreateTime then the timestamp
> value
> > is
> > > determined when producer produces message. With these two definitions,
> > the
> > > timestamp should always be positive. We probably need a new type here
> if
> > we
> > > can not put timestamp in the Record Header or the message payload. Does
> > > this sound reasonable?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > > On Tue, Dec 5, 2017 at 8:40 AM, Konstantin Chukhlomin <
> > chuhlo...@gmail.com>
> > > wrote:
> > >
> > >> Hi all,
> > >>
> > >> I have created a KIP to support negative timestamp:
> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> 228+Negative+record+timestamp+support  > >> confluence/display/KAFKA/KIP-228+Negative+record+timestamp+support>
> > >>
> > >> Here are proposed changes: https://github.com/apache/
> > >> kafka/compare/trunk...chuhlomin:trunk  > >> kafka/compare/trunk...chuhlomin:trunk>
> > >>
> > >> I'm pretty sure that not cases are covered, so comments and
> suggestions
> > >> are welcome.
> > >>
> > >> Thank you,
> > >> Konstantin
> >
> >
>


[jira] [Resolved] (KAFKA-5878) Add sensor for queue size of the controller-event-thread

2017-12-05 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-5878.
-
Resolution: Won't Fix

> Add sensor for queue size of the controller-event-thread
> 
>
> Key: KAFKA-5878
> URL: https://issues.apache.org/jira/browse/KAFKA-5878
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6285) OffsetCommitRequest should have read-after-write logic

2017-12-05 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6285.
-
Resolution: Duplicate

Duplicate of KAFKA-6285

> OffsetCommitRequest should have read-after-write logic
> --
>
> Key: KAFKA-6285
> URL: https://issues.apache.org/jira/browse/KAFKA-6285
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>
> Currently OffsetCommitRequest does not have read-after-write logic and a 
> consumer can possibly read an older offset value after successfully 
> committing the offset. This is because broker may respond to 
> OffsetCommitRequest before writing offset to the disk and the memory.
> This is probably not a problem for most users who do not immediately read 
> offset after committing offset. But it can be a problem if broker fails 
> before writing offset to disk. It will be nice if we can have 
> read-after-write logic for OffsetCommitRequest.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #4295: KAFKA-6299. Fix AdminClient error handling when me...

2017-12-05 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/4295

KAFKA-6299. Fix AdminClient error handling when metadata changes

AdminClient should only call Metadata#requestUpdate when needed.

AdminClient should retry requests for which the controller has changed.

Fix an issue where AdminClient requests might not get a security
exception, even when a metadata fetch fails with an authorization
exception.

Fix a possible issue where AdminClient might leak a socket after the
timeout expires on a hard close, if a very narrow race condition is
hit.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka KAFKA-6299

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4295.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4295


commit 5f548050131d4ff9078cba3e20062cc17788eca8
Author: Colin P. Mccabe 
Date:   2017-12-06T00:32:51Z

KAFKA-6299. Fix AdminClient error handling when metadata changes

AdminClient should only call Metadata#requestUpdate when needed.

AdminClient should retry requests for which the controller has changed.

Fix an issue where AdminClient requests might not get a security
exception, even when a metadata fetch fails with an authorization
exception.

Fix a possible issue where AdminClient might leak a socket after the
timeout expires on a hard close, if a very narrow race condition is
hit.




---


Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Dong Lin
Hey Konstantin,

According to KIP-32 the timestamp is also used for log rolling and log
retention. Therefore, unless broker is configured to never delete any
message based on time, messages produced with negative timestamp in your
use-case will be deleted by the broker anyway. Do you actually plan to use
Kafka as a persistent storage system that never delete messages?

Thanks,
Dong




On Tue, Dec 5, 2017 at 1:24 PM, Konstantin Chukhlomin 
wrote:

> Hi Dong,
>
> Currently we are storing historical timestamp in the message.
>
> What we are trying to achieve is to make it possible to do Kafka lookup
> by timestamp. Ideally I would do `offsetsForTimes` to find articles
> published
> in 1910s (if we are storing articles on the log).
>
> So first two suggestions aren't really covering our use-case.
>
> We could create a new timestamp type like "HistoricalTimestamp" or
> "MaybeNegativeTimestamp".
> And the only difference between this one and CreateTime is that it could
> be negative.
> I tend to use CreateTime for this purpose because it's easier to
> understand from
> user perspective as a timestamp which publisher can set.
>
> Thanks,
> Konstantin
>
> > On Dec 5, 2017, at 3:47 PM, Dong Lin  wrote:
> >
> > Hey Konstantin,
> >
> > Thanks for the KIP. I have a few questions below.
> >
> > Strictly speaking Kafka actually allows you to store historical data. And
> > user are free to encode arbitrary timestamp field in their Kafka message.
> > For example, your Kafka message can currently have Json or Avro format
> and
> > you can put a timestamp field there. Do you think that could address your
> > use-case?
> >
> > Alternatively, KIP-82 introduced Record Header in Kafka and you can also
> > define your customized key/value pair in the header. Do you think this
> can
> > address your use-case?
> >
> > Also, currently there are two types of timestamp according to KIP-32. If
> > the type is LogAppendTime then the timestamp value is the time when
> broker
> > receives the message. If the type is CreateTime then the timestamp value
> is
> > determined when producer produces message. With these two definitions,
> the
> > timestamp should always be positive. We probably need a new type here if
> we
> > can not put timestamp in the Record Header or the message payload. Does
> > this sound reasonable?
> >
> > Thanks,
> > Dong
> >
> >
> >
> > On Tue, Dec 5, 2017 at 8:40 AM, Konstantin Chukhlomin <
> chuhlo...@gmail.com>
> > wrote:
> >
> >> Hi all,
> >>
> >> I have created a KIP to support negative timestamp:
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 228+Negative+record+timestamp+support  >> confluence/display/KAFKA/KIP-228+Negative+record+timestamp+support>
> >>
> >> Here are proposed changes: https://github.com/apache/
> >> kafka/compare/trunk...chuhlomin:trunk  >> kafka/compare/trunk...chuhlomin:trunk>
> >>
> >> I'm pretty sure that not cases are covered, so comments and suggestions
> >> are welcome.
> >>
> >> Thank you,
> >> Konstantin
>
>


Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-05 Thread Becket Qin
Hi Colin,

Thinking about this again. I do see the reason that we want to have a epoch
to avoid out of order registration of the interested set. But I am
wondering if the following semantic would meet what we want better:
 - Session Id: the id assigned to a single client for life long time. i.e
it does not change when the interested partitions change.
 - Epoch: the interested set epoch. Only updated when a full fetch request
comes, which may result in the interested partition set change.
This will ensure that the registered interested set will always be the
latest registration. And the clients can change the interested partition
set without creating another session.

Also I want to bring up the way the leader respond to the FetchRequest
again. I think it would be a big improvement if we just return the
responses at index entry boundary or log end. There are a few benefits:
1. The leader does not need the follower to provide the offsets,
2. The fetch requests no longer need to do a binary search on the index, it
just need to do a linear access to the index file, which is much cache
friendly.

Assuming the leader can get the last returned offsets to the clients
cheaply, I am still not sure why it is necessary for the followers to
repeat the offsets in the incremental fetch every time. Intuitively it
should only update the offsets when the leader has wrong offsets, in most
cases, the incremental fetch request should just be empty. Otherwise we may
not be saving much when there are continuous small requests going to each
partition, which could be normal for some low latency systems.

Thanks,

Jiangjie (Becket) Qin




On Tue, Dec 5, 2017 at 2:14 PM, Colin McCabe  wrote:

> On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> > Hi Colin
> >
> > Addressing the topic of how to manage slots from the other thread.
> > With tcp connections all this comes for free essentially.
>
> Hi Jan,
>
> I don't think that it's accurate to say that cache management "comes for
> free" by coupling the incremental fetch session with the TCP session.
> When a new TCP session is started by a fetch request, you still have to
> decide whether to grant that request an incremental fetch session or
> not.  If your answer is that you always grant the request, I would argue
> that you do not have cache management.
>
> I guess you could argue that timeouts are cache management, but I don't
> find that argument persuasive.  Anyone could just create a lot of TCP
> sessions and use a lot of resources, in that case.  So there is
> essentially no limit on memory use.  In any case, TCP sessions don't
> help us implement fetch session timeouts.
>
> > I still would argue we disable it by default and make a flag in the
> > broker to ask the leader to maintain the cache while replicating and
> also only
> > have it optional in consumers (default to off) so one can turn it on
> > where it really hurts.  MirrorMaker and audit consumers prominently.
>
> I agree with Jason's point from earlier in the thread.  Adding extra
> configuration knobs that aren't really necessary can harm usability.
> Certainly asking people to manually turn on a feature "where it really
> hurts" seems to fall in that category, when we could easily enable it
> automatically for them.
>
> >
> > Otherwise I left a few remarks in-line, which should help to understand
> > my view of the situation better
> >
> > Best Jan
> >
> >
> > On 05.12.2017 08:06, Colin McCabe wrote:
> > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > >>
> > >> On 03.12.2017 21:55, Colin McCabe wrote:
> > >>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> >  Thanks for the explanation, Colin. A few more questions.
> > 
> > > The session epoch is not complex.  It's just a number which
> increments
> > > on each incremental fetch.  The session epoch is also useful for
> > > debugging-- it allows you to match up requests and responses when
> > > looking at log files.
> >  Currently each request in Kafka has a correlation id to help match
> the
> >  requests and responses. Is epoch doing something differently?
> > >>> Hi Becket,
> > >>>
> > >>> The correlation ID is used within a single TCP session, to uniquely
> > >>> associate a request with a response.  The correlation ID is not
> unique
> > >>> (and has no meaning) outside the context of that single TCP session.
> > >>>
> > >>> Keep in mind, NetworkClient is in charge of TCP sessions, and
> generally
> > >>> tries to hide that information from the upper layers of the code.  So
> > >>> when you submit a request to NetworkClient, you don't know if that
> > >>> request creates a TCP session, or reuses an existing one.
> > > Unfortunately, this doesn't work.  Imagine the client misses an
> > > increment fetch response about a partition.  And then the
> partition is
> > > never updated after that.  The client has no way to know about the
> > > partition, since it won't be included in 

Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Matthias J. Sax
Thanks for the KIP Konstantin.

From my understanding, you propose to just remove the negative timestamp
check in KafkaProducer and KafkaStreams. If topics are configured with
`CreateTime` brokers also write negative timestamps if they are embedded
in the message.

However, I am not sure about the overlapping semantics for -1 timestamp.
My concerns is, that this ambiguity might result in issues. Assume that
there is a topic (configured with `CreateTime`) for which an old and a
new producer are writing. The old producer uses old message format and
does not include any timestamp in the message. The broker will "upgrade"
this message to the new format and set -1. At the same time, the new
producer could write a message with valid timestamp -1. A consumer could
not distinguish between both cases...

Also, there might be other Producer implementations that write negative
timestamps. Thus, those might already exist. For Streams, we don't
process those and we should make sure to keep it this way (to avoid
ambiguity).

Thus, it might actually make sense to introduce a new timestamp type to
express those new semantics. The question is still, how to deal with
older producer clients that want to write to those topics.

 - We could either use `Long.MIN_VALUE` as "unknown" (this would be way
better than -1 as it's not in the middle of the range but at the very
end and it will also have well-defined semantics).
 - Or we use a "mixed-mode" where we use broker wall-clock time for
older message formats (ie, append time semantics for older producers)
 - Third, we would even give an error message back to older producers;
this might change the backward compatibility guarantees Kafka provides
so far when upgrading brokers. However, this would not affect exiting
topics, but only newly created ones (and we could disallow changing the
semantics to the new timestamp type to guard against miss
configuration). Thus, it might be ok.

For Streams, we could check the topic config and process negative
timestamps only if the topic is configures with the new timestamp type.


Maybe I am a little bit to paranoid about overloading -1 semantics.
Curious to get feedback from others.



-Matthias


On 12/5/17 1:24 PM, Konstantin Chukhlomin wrote:
> Hi Dong,
> 
> Currently we are storing historical timestamp in the message.
> 
> What we are trying to achieve is to make it possible to do Kafka lookup 
> by timestamp. Ideally I would do `offsetsForTimes` to find articles published 
> in 1910s (if we are storing articles on the log).
> 
> So first two suggestions aren't really covering our use-case.
> 
> We could create a new timestamp type like "HistoricalTimestamp" or 
> "MaybeNegativeTimestamp".
> And the only difference between this one and CreateTime is that it could be 
> negative.
> I tend to use CreateTime for this purpose because it's easier to understand 
> from 
> user perspective as a timestamp which publisher can set.
> 
> Thanks,
> Konstantin
> 
>> On Dec 5, 2017, at 3:47 PM, Dong Lin  wrote:
>>
>> Hey Konstantin,
>>
>> Thanks for the KIP. I have a few questions below.
>>
>> Strictly speaking Kafka actually allows you to store historical data. And
>> user are free to encode arbitrary timestamp field in their Kafka message.
>> For example, your Kafka message can currently have Json or Avro format and
>> you can put a timestamp field there. Do you think that could address your
>> use-case?
>>
>> Alternatively, KIP-82 introduced Record Header in Kafka and you can also
>> define your customized key/value pair in the header. Do you think this can
>> address your use-case?
>>
>> Also, currently there are two types of timestamp according to KIP-32. If
>> the type is LogAppendTime then the timestamp value is the time when broker
>> receives the message. If the type is CreateTime then the timestamp value is
>> determined when producer produces message. With these two definitions, the
>> timestamp should always be positive. We probably need a new type here if we
>> can not put timestamp in the Record Header or the message payload. Does
>> this sound reasonable?
>>
>> Thanks,
>> Dong
>>
>>
>>
>> On Tue, Dec 5, 2017 at 8:40 AM, Konstantin Chukhlomin 
>> wrote:
>>
>>> Hi all,
>>>
>>> I have created a KIP to support negative timestamp:
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 228+Negative+record+timestamp+support >> confluence/display/KAFKA/KIP-228+Negative+record+timestamp+support>
>>>
>>> Here are proposed changes: https://github.com/apache/
>>> kafka/compare/trunk...chuhlomin:trunk >> kafka/compare/trunk...chuhlomin:trunk>
>>>
>>> I'm pretty sure that not cases are covered, so comments and suggestions
>>> are welcome.
>>>
>>> Thank you,
>>> Konstantin
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-231: Improve the Required ACL of ListGroups API

2017-12-05 Thread Vahid S Hashemian
I forgot to mention that OffsetFetch is another example that is affected 
by such filtering.
Starting from version 2 and when offsets of all topics are requested, only 
topics to which user has describe access will appear in the result.
If user does not have describe access to any topic (assuming Describe 
Group access is in place) an empty list is returned without any 
authorization error.

--Vahid



From:   "Vahid S Hashemian" 
To: dev@kafka.apache.org
Date:   12/05/2017 02:42 PM
Subject:Re: [DISCUSS] KIP-231: Improve the Required ACL of 
ListGroups API



Hi Dong,

Thanks for your message.

My driving point for this KIP was a -seemingly- broken logic rather than a 

particular use case.
I also considered the semantics of the ListGroups API and its expected 
behavior: To me, a user who can fetch/commit offsets in some consumer 
groups should be able to run ListGroups and see those groups (and nothing 
else) in the result. This expected behavior conveys (kind of) a use case 
by itself.

Regarding your second question, I see the external service for managing 
ACLs should not be directly concerned with any Kafka specific 
functionality.
Plus, as Ismael mentioned, we already implement something very similar for 

the TopicMetadataRequest API (when metadata of all topics is requested).

Thanks again for reviewing the KIP closely and sharing your feedback.

Regards.
--Vahid




From:   Dong Lin 
To: dev@kafka.apache.org
Date:   12/04/2017 04:52 PM
Subject:Re: [DISCUSS] KIP-231: Improve the Required ACL of 
ListGroups API



Hey Vahid,

Thank you for the explanation.

I am still wondering whether you have a specific use-case for user to be
able to list the groups for which he/she has Describe access. I tried to
think through this but couldn't find a specific use-case for this feature
yet. I understand that admin of the Kafka cluster may want to know the
number of consumer groups in the cluster because the number of consumer
groups can affect the performance of the Kafka service. But this 
motivation
does not hold for non-admin user. I image that a typical user probably
already knows the group it wants to access, in which case he/she can
use DescribeGroupsRequest to query the information of this group. On the
other hand, if user does not already know the list of groups it has access
it, I don't know how user is going to use this information after he/she is
given a list of strings as the group Id. Could you help me understand
whether there is use-case for this feature?

Alternatively, maybe this KIP is solely driven by the idea that user 
should
be able to list the groups for which he/she has Describe access. In this
case, do you think it would be reasonable for the external service which 
is
used to store the ACL to provide this capability of query groupId list
based on the user credential?

Thanks,
Dong


On Mon, Dec 4, 2017 at 3:42 PM, Vahid S Hashemian 
 wrote:

> Hi Dong,
>
> Thanks for reviewing the KIP and providing your feedback.
>
> As you mentioned, the KIP suggests a modification to the semantics of
> ListGroupResponse.
> The main reason for submitting the KIP and suggesting a modification is
> that there are issues with the current one (further explained in the 
KIP):
> 1) listing groups is an all or nothing action, and 2) a Read access on a
> group does not mean the user can list that group.
>
> I understand the solution outlined in the KIP may not be the best one, 
and
> I have been debating that myself even after opening it up for 
discussion.
> That solution was the one discussed with Jason on the corresponding 
JIRA.
> I would also prefer an authorization
> error in case none of the two accesses are in place, which should, at
> least, partially address your concern (the ACL would filter, but would
> also return authorization errors if needed). If you can think of 
something
> on that front please let me know.
>
> Regarding the use case you asked for: if I'm just a user in the cluster
> sharing it with other users, I should be able to see the groups of my
> consumers without being exposed to other users' groups in the cluster.
>
> I hope I addressed your concerns. If I did not, or if I missed anything,
> please let me know. Thanks.
>
> Regards.
> --Vahid
>
>
>
>
> From:   Dong Lin 
> To: dev@kafka.apache.org
> Date:   12/04/2017 01:43 PM
> Subject:Re: [DISCUSS] KIP-231: Improve the Required ACL of
> ListGroups API
>
>
>
> I forgot another question. Can you provide a use-case where a user wants
> to
> list all groups for which he/she has the Describe access?
>
> Thanks,
> Dong
>
>
> On Mon, Dec 4, 2017 at 1:40 PM, Dong Lin  wrote:
>
> > Hey Vahid,
> >
> > Thanks for the KIP. If I understand the you correctly, you want client
> to
> > be able to list all the groups for which it currently has the describe
> > access.
> >
> > As of now the 

Re: [DISCUSS] KIP-210: Provide for custom error handling when Kafka Streams fails to produce

2017-12-05 Thread Matthias J. Sax
Thanks a lot for the update! Great write-up! Very clearly explained what
the change will look like!

Looks good to me. No further comments from my side.


-Matthias


On 12/5/17 9:14 AM, Matt Farmer wrote:
> I have updated this KIP accordingly.
> 
> Can you please take a look and let me know if what I wrote looks correct to
> you?
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce
> 
> Thanks!
> 
> Matt
> 
> 
> On December 4, 2017 at 9:39:13 PM, Matt Farmer (m...@frmr.me) wrote:
> 
> Hey Matthias, thanks for getting back to me.
> 
> That's fine. But if we add it to `test` package, we don't need to talk
> about it in the KIP. `test` is not public API.
> 
> Yes, that makes sense. It was in the KIP originally because I was, at one
> point, planning on including it. We can remove it now that we’ve decided we
> won’t include it in the public API.
> 
> Understood. That makes sense. We should explain this clearly in the KIP
> and maybe log all other following exceptions at DEBUG level?
> 
> 
> I thought it was clear in the KIP, but I can go back and double check my
> wording and revise it to try and make it clearer.
> 
> I’ll take a look at doing more work on the KIP and the Pull Request
> tomorrow.
> 
> Thanks again!
> 
> On December 4, 2017 at 5:50:33 PM, Matthias J. Sax (matth...@confluent.io)
> wrote:
> 
> Hey,
> 
> About your questions:
> 
 Acknowledged, so is ProducerFencedException the only kind of exception I
 need to change my behavior on? Or are there other types I need to
> check? Is
 there a comprehensive list somewhere?
> 
> I cannot think if any other atm. We should list all fatal exceptions for
> which we don't call the handler and explain why (exception is "global"
> and will affect all other records, too | ProducerFenced is self-healing).
> 
> We started to collect and categorize exception here (not completed yet):
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture#KafkaStreamsArchitecture-TypesofExceptions
> :
> 
> This list should be a good starting point though.
> 
>> I include it in the test package because I have tests that assert that if
>> the record collector impl encounters an Exception and receives a CONTINUE
>> that it actually does CONTINUE.
> 
> That's fine. But if we add it to `test` package, we don't need to talk
> about it in the KIP. `test` is not public API.
> 
>> I didn't want to invoke the handler in places where the CONTINUE or FAIL
>> result would just be ignored. Presumably, after a FAIL has been returned,
>> subsequent exceptions are likely to be repeats or noise from my
>> understanding of the code paths. If this is incorrect we can revisit.
> 
> Understood. That makes sense. We should explain this clearly in the KIP
> and maybe log all other following exceptions at DEBUG level?
> 
> 
> -Matthias
> 
> 
> On 12/1/17 11:43 AM, Matt Farmer wrote:
>> Bump! It's been three days here and I haven't seen any further feedback.
>> Eager to get this resolved, approved, and merged. =)
>>
>> On Tue, Nov 28, 2017 at 9:53 AM Matt Farmer  wrote:
>>
>>> Hi there, sorry for the delay in responding. Last week had a holiday and
>>> several busy work days in it so I'm just now getting around to
> responding.
>>>
 We would only exclude
 exception Streams can handle itself (like ProducerFencedException) --
 thus, if the handler has code to react to this, it would not be bad, as
 this code is just never called.
>>> [...]
 Thus, I am still in favor of not calling the ProductionExceptionHandler
 for fatal exception.
>>>
>>> Acknowledged, so is ProducerFencedException the only kind of exception I
>>> need to change my behavior on? Or are there other types I need to check?
> Is
>>> there a comprehensive list somewhere?
>>>
 About the "always continue" case. Sounds good to me to remove it (not
 sure why we need it in test package?)
>>>
>>> I include it in the test package because I have tests that assert that if
>>> the record collector impl encounters an Exception and receives a CONTINUE
>>> that it actually does CONTINUE.
>>>
 What is there reasoning for invoking the handler only for the first
 exception?
>>>
>>> I didn't want to invoke the handler in places where the CONTINUE or FAIL
>>> result would just be ignored. Presumably, after a FAIL has been returned,
>>> subsequent exceptions are likely to be repeats or noise from my
>>> understanding of the code paths. If this is incorrect we can revisit.
>>>
>>> Once I get the answers to these questions I can make another pass on the
>>> pull request!
>>>
>>> Matt
>>>
>>> On Mon, Nov 20, 2017 at 4:07 PM Matthias J. Sax 
>>> wrote:
>>>
 Thanks for following up!

 One thought about an older reply from you:

 I strongly disagree here. The purpose of this handler isn't *just*
> to
 make a decision for streams. There may also 

Re: [DISCUSS] KIP-231: Improve the Required ACL of ListGroups API

2017-12-05 Thread Vahid S Hashemian
Hi Dong,

Thanks for your message.

My driving point for this KIP was a -seemingly- broken logic rather than a 
particular use case.
I also considered the semantics of the ListGroups API and its expected 
behavior: To me, a user who can fetch/commit offsets in some consumer 
groups should be able to run ListGroups and see those groups (and nothing 
else) in the result. This expected behavior conveys (kind of) a use case 
by itself.

Regarding your second question, I see the external service for managing 
ACLs should not be directly concerned with any Kafka specific 
functionality.
Plus, as Ismael mentioned, we already implement something very similar for 
the TopicMetadataRequest API (when metadata of all topics is requested).

Thanks again for reviewing the KIP closely and sharing your feedback.

Regards.
--Vahid




From:   Dong Lin 
To: dev@kafka.apache.org
Date:   12/04/2017 04:52 PM
Subject:Re: [DISCUSS] KIP-231: Improve the Required ACL of 
ListGroups API



Hey Vahid,

Thank you for the explanation.

I am still wondering whether you have a specific use-case for user to be
able to list the groups for which he/she has Describe access. I tried to
think through this but couldn't find a specific use-case for this feature
yet. I understand that admin of the Kafka cluster may want to know the
number of consumer groups in the cluster because the number of consumer
groups can affect the performance of the Kafka service. But this 
motivation
does not hold for non-admin user. I image that a typical user probably
already knows the group it wants to access, in which case he/she can
use DescribeGroupsRequest to query the information of this group. On the
other hand, if user does not already know the list of groups it has access
it, I don't know how user is going to use this information after he/she is
given a list of strings as the group Id. Could you help me understand
whether there is use-case for this feature?

Alternatively, maybe this KIP is solely driven by the idea that user 
should
be able to list the groups for which he/she has Describe access. In this
case, do you think it would be reasonable for the external service which 
is
used to store the ACL to provide this capability of query groupId list
based on the user credential?

Thanks,
Dong


On Mon, Dec 4, 2017 at 3:42 PM, Vahid S Hashemian 
 wrote:

> Hi Dong,
>
> Thanks for reviewing the KIP and providing your feedback.
>
> As you mentioned, the KIP suggests a modification to the semantics of
> ListGroupResponse.
> The main reason for submitting the KIP and suggesting a modification is
> that there are issues with the current one (further explained in the 
KIP):
> 1) listing groups is an all or nothing action, and 2) a Read access on a
> group does not mean the user can list that group.
>
> I understand the solution outlined in the KIP may not be the best one, 
and
> I have been debating that myself even after opening it up for 
discussion.
> That solution was the one discussed with Jason on the corresponding 
JIRA.
> I would also prefer an authorization
> error in case none of the two accesses are in place, which should, at
> least, partially address your concern (the ACL would filter, but would
> also return authorization errors if needed). If you can think of 
something
> on that front please let me know.
>
> Regarding the use case you asked for: if I'm just a user in the cluster
> sharing it with other users, I should be able to see the groups of my
> consumers without being exposed to other users' groups in the cluster.
>
> I hope I addressed your concerns. If I did not, or if I missed anything,
> please let me know. Thanks.
>
> Regards.
> --Vahid
>
>
>
>
> From:   Dong Lin 
> To: dev@kafka.apache.org
> Date:   12/04/2017 01:43 PM
> Subject:Re: [DISCUSS] KIP-231: Improve the Required ACL of
> ListGroups API
>
>
>
> I forgot another question. Can you provide a use-case where a user wants
> to
> list all groups for which he/she has the Describe access?
>
> Thanks,
> Dong
>
>
> On Mon, Dec 4, 2017 at 1:40 PM, Dong Lin  wrote:
>
> > Hey Vahid,
> >
> > Thanks for the KIP. If I understand the you correctly, you want client
> to
> > be able to list all the groups for which it currently has the describe
> > access.
> >
> > As of now the ListGroupRequest does not allow user to specify the 
group.
> > If user does not have the Describe Cluster access, ListGroupResponse
> will
> > return error. This KIP proposes to change the semantics of
> > ListGroupsResponse such that ListGroupResponse will return the subset 
of
> > groups for which the user has the Describe access. And if the does not
> have
> > Describe access to any group, ListGroupResponse will return an empty
> list
> > with no error.
> >
> > In my opinion this changes the semantics of ListGroupsResponse in a
> > counter-intuitive way. Usually we use the ACL to determine whether the
> 

Re: [DISCUSS] KIP-231: Improve the Required ACL of ListGroups API

2017-12-05 Thread Vahid S Hashemian
Hi Ismael,

Thanks for the pointer.
That's a good example of how we already implemented a similar filtering.

--Vahid



From:   Ismael Juma 
To: dev 
Date:   12/05/2017 01:24 AM
Subject:Re: [DISCUSS] KIP-231: Improve the Required ACL of 
ListGroups API
Sent by:isma...@gmail.com



One comment below.

On Mon, Dec 4, 2017 at 11:40 PM, Dong Lin  wrote:

> In my opinion this changes the semantics of ListGroupsResponse in a
> counter-intuitive way. Usually we use the ACL to determine whether the
> operation on the specified object can be performed or not. The response
> should provide either an error message or the result for the specified
> object. I couldn't remember a case where the ACL is used to filter the
> result without providing error.
>

We do this in the metadata request that requests all topics. We just 
return
the topics that the user has describe access to.

Ismael






Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-05 Thread Colin McCabe
On Tue, Dec 5, 2017, at 13:13, Jan Filipiak wrote:
> Hi Colin
> 
> Addressing the topic of how to manage slots from the other thread.
> With tcp connections all this comes for free essentially.

Hi Jan,

I don't think that it's accurate to say that cache management "comes for
free" by coupling the incremental fetch session with the TCP session. 
When a new TCP session is started by a fetch request, you still have to
decide whether to grant that request an incremental fetch session or
not.  If your answer is that you always grant the request, I would argue
that you do not have cache management.

I guess you could argue that timeouts are cache management, but I don't
find that argument persuasive.  Anyone could just create a lot of TCP
sessions and use a lot of resources, in that case.  So there is
essentially no limit on memory use.  In any case, TCP sessions don't
help us implement fetch session timeouts.

> I still would argue we disable it by default and make a flag in the
> broker to ask the leader to maintain the cache while replicating and also only
> have it optional in consumers (default to off) so one can turn it on 
> where it really hurts.  MirrorMaker and audit consumers prominently.

I agree with Jason's point from earlier in the thread.  Adding extra
configuration knobs that aren't really necessary can harm usability. 
Certainly asking people to manually turn on a feature "where it really
hurts" seems to fall in that category, when we could easily enable it
automatically for them.

> 
> Otherwise I left a few remarks in-line, which should help to understand
> my view of the situation better
> 
> Best Jan
> 
> 
> On 05.12.2017 08:06, Colin McCabe wrote:
> > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> >>
> >> On 03.12.2017 21:55, Colin McCabe wrote:
> >>> On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
>  Thanks for the explanation, Colin. A few more questions.
> 
> > The session epoch is not complex.  It's just a number which increments
> > on each incremental fetch.  The session epoch is also useful for
> > debugging-- it allows you to match up requests and responses when
> > looking at log files.
>  Currently each request in Kafka has a correlation id to help match the
>  requests and responses. Is epoch doing something differently?
> >>> Hi Becket,
> >>>
> >>> The correlation ID is used within a single TCP session, to uniquely
> >>> associate a request with a response.  The correlation ID is not unique
> >>> (and has no meaning) outside the context of that single TCP session.
> >>>
> >>> Keep in mind, NetworkClient is in charge of TCP sessions, and generally
> >>> tries to hide that information from the upper layers of the code.  So
> >>> when you submit a request to NetworkClient, you don't know if that
> >>> request creates a TCP session, or reuses an existing one.
> > Unfortunately, this doesn't work.  Imagine the client misses an
> > increment fetch response about a partition.  And then the partition is
> > never updated after that.  The client has no way to know about the
> > partition, since it won't be included in any future incremental fetch
> > responses.  And there are no offsets to compare, since the partition is
> > simply omitted from the response.
>  I am curious about in which situation would the follower miss a response
>  of a partition. If the entire FetchResponse is lost (e.g. timeout), the
>  follower would disconnect and retry. That will result in sending a full
>  FetchRequest.
> >>> Basically, you are proposing that we rely on TCP for reliable delivery
> >>> in a distributed system.  That isn't a good idea for a bunch of
> >>> different reasons.  First of all, TCP timeouts tend to be very long.  So
> >>> if the TCP session timing out is your error detection mechanism, you
> >>> have to wait minutes for messages to timeout.  Of course, we add a
> >>> timeout on top of that after which we declare the connection bad and
> >>> manually close it.  But just because the session is closed on one end
> >>> doesn't mean that the other end knows that it is closed.  So the leader
> >>> may have to wait quite a long time before TCP decides that yes,
> >>> connection X from the follower is dead and not coming back, even though
> >>> gremlins ate the FIN packet which the follower attempted to translate.
> >>> If the cache state is tied to that TCP session, we have to keep that
> >>> cache around for a much longer time than we should.
> >> Hi,
> >>
> >> I see this from a different perspective. The cache expiry time
> >> has the same semantic as idle connection time in this scenario.
> >> It is the time range we expect the client to come back an reuse
> >> its broker side state. I would argue that on close we would get an
> >> extra shot at cleaning up the session state early. As opposed to
> >> always wait for that duration for expiry to happen.
> > Hi Jan,
> >
> > The idea here is that the incremental 

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-05 Thread Ted Yu
Thanks for responding, Colin.

bq. If we have a bunch of small fetch sessions and a bigger client comes
in, we might have to evict many small sessions to fit the bigger one.

Suppose there were N small fetch sessions and 1 big fetch session comes in.
If the plan is to use number of partitions to approximate heap consumption,
that should be good enough, IMHO.
Evicting only one of the N small fetch sessions may not release enough
memory since the total partition count would increase a lot.

Cheers

On Tue, Dec 5, 2017 at 1:44 PM, Colin McCabe  wrote:

> On Tue, Dec 5, 2017, at 11:24, Ted Yu wrote:
> > bq. We also have a tunable for total number of cache slots. We never
> > cache
> > more than this number of incremental fetch sessions.
> >
> > Is it possible to manage the cache based on heap consumption instead of
> > number of slots ?
> > It seems heap estimation can be done by counting PartitionData (along
> > with overhead for related Map structure).
>
> Hi Ted,
>
> That's an interesting idea.  I think it starts to get complicated,
> though.
>
> For example, suppose we later implement incrementally adding partitions
> to the fetch session.  When a fetch session adds more partitions, it
> uses more memory.  So should this trigger an eviction?
>
> If we have a bunch of small fetch sessions and a bigger client comes in,
> we might have to evict many small sessions to fit the bigger one.  But
> we probably do want to fit the bigger one in, since bigger requests gain
> proportionally more from being incremental.
>
> [ Small digression: In general fetch requests have some fixed cost plus
> a variable cost based on the number of partitions.  The more partitions
> you add, the more the variable cost comes to dominate.  Therefore, it is
> especially good to make big fetch requests into incremental fetch
> requests.  Small fetch requests for one or two partitions may not gain
> much, since their cost is dominated by the fixed cost anyway (message
> header, TCP overhead, IP packet overhead, etc.) ]
>
> Overall, I would still lean towards limiting the number of incremental
> fetch sessions, rather than trying to create a per-partition data memory
> limit.  I think the complexity is probably not worth it.  The memory
> limit is more of a sanity check anyway, than a fine-grained limit.  If
> we can get the really big clients using incremental fetches, and the
> followers using incremental fetches, we have captured most of the
> benefits.  I'm curious if there is a more elegant way to limit
> per-partition that I may have missed, though?
>
> best,
> Colin
>
>
> >
> > Cheers
> >
> > On Tue, Dec 5, 2017 at 11:02 AM, Colin McCabe 
> wrote:
> >
> > > On Tue, Dec 5, 2017, at 08:51, Jason Gustafson wrote:
> > > > Hi Colin,
> > > >
> > > > Thanks for the response. A couple replies:
> > > >
> > > >
> > > > > I’m a bit ambivalent about letting the client choose the session
> > > > > timeout.  What if clients choose timeouts that are too long?
> Hmm
> > > > > I do agree the timeout should be sized proportional to
> > > > > max.poll.interval.ms.
> > > >
> > > >
> > > > We have solved this in other cases by letting the broker enforce a
> > > > maximum timeout. After thinking about it a bit, it's probably
> overkill
> > > in this
> > > > case since the caching is just an optimization. Instead of stressing
> over
> > > > timeouts and such, I am actually wondering if we just need a
> reasonable
> > > > session cache eviction policy. For example, when the number of slots
> is
> > > > exceeded, perhaps you evict the session with the fewest partitions
> or the
> > > > one with the largest interval between fetches. We could give
> priority to
> > > > the replicas. Perhaps this might let us get rid of a few of the
> configs.
> > >
> > > I agree that it would be nice to get rid of the tunable for eviction
> > > time.  However, I'm concerned that if we do, we might run into cache
> > > thrashing.  For example, if we have N cache slots and N+1 clients that
> > > are all fetching continuously, we might have to evict a client on every
> > > single fetch.  It would be much better to give a cache slot to N
> clients
> > > and let the last client do full fetch requests.
> > >
> > > Perhaps we could mitigate this problem by evicting the smallest fetch
> > > session-- the one that is for the smallest number of partitions.  This
> > > would allow "big" clients that fetch many partitions (e.g. MirrorMaker)
> > > to get priority.  But then you run into the problem where someone
> > > fetches a huge number of partitions, and then goes away for a long
> time,
> > > and you never reuse that cache memory.
> > >
> > > How about this approach?  We have a tunable for minimum eviction time
> > > (default 2 minutes).  We cannot evict a client before this timeout has
> > > expired.  We also have a tunable for total number of cache slots.  We
> > > never cache more than this number of incremental fetch sessions.
> > >
> 

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-05 Thread Colin McCabe
On Tue, Dec 5, 2017, at 11:24, Ted Yu wrote:
> bq. We also have a tunable for total number of cache slots. We never
> cache
> more than this number of incremental fetch sessions.
> 
> Is it possible to manage the cache based on heap consumption instead of
> number of slots ?
> It seems heap estimation can be done by counting PartitionData (along
> with overhead for related Map structure).

Hi Ted,

That's an interesting idea.  I think it starts to get complicated,
though.

For example, suppose we later implement incrementally adding partitions
to the fetch session.  When a fetch session adds more partitions, it
uses more memory.  So should this trigger an eviction?

If we have a bunch of small fetch sessions and a bigger client comes in,
we might have to evict many small sessions to fit the bigger one.  But
we probably do want to fit the bigger one in, since bigger requests gain
proportionally more from being incremental.

[ Small digression: In general fetch requests have some fixed cost plus
a variable cost based on the number of partitions.  The more partitions
you add, the more the variable cost comes to dominate.  Therefore, it is
especially good to make big fetch requests into incremental fetch
requests.  Small fetch requests for one or two partitions may not gain
much, since their cost is dominated by the fixed cost anyway (message
header, TCP overhead, IP packet overhead, etc.) ]

Overall, I would still lean towards limiting the number of incremental
fetch sessions, rather than trying to create a per-partition data memory
limit.  I think the complexity is probably not worth it.  The memory
limit is more of a sanity check anyway, than a fine-grained limit.  If
we can get the really big clients using incremental fetches, and the
followers using incremental fetches, we have captured most of the
benefits.  I'm curious if there is a more elegant way to limit
per-partition that I may have missed, though?

best,
Colin


> 
> Cheers
> 
> On Tue, Dec 5, 2017 at 11:02 AM, Colin McCabe  wrote:
> 
> > On Tue, Dec 5, 2017, at 08:51, Jason Gustafson wrote:
> > > Hi Colin,
> > >
> > > Thanks for the response. A couple replies:
> > >
> > >
> > > > I’m a bit ambivalent about letting the client choose the session
> > > > timeout.  What if clients choose timeouts that are too long? Hmm
> > > > I do agree the timeout should be sized proportional to
> > > > max.poll.interval.ms.
> > >
> > >
> > > We have solved this in other cases by letting the broker enforce a
> > > maximum timeout. After thinking about it a bit, it's probably overkill
> > in this
> > > case since the caching is just an optimization. Instead of stressing over
> > > timeouts and such, I am actually wondering if we just need a reasonable
> > > session cache eviction policy. For example, when the number of slots is
> > > exceeded, perhaps you evict the session with the fewest partitions or the
> > > one with the largest interval between fetches. We could give priority to
> > > the replicas. Perhaps this might let us get rid of a few of the configs.
> >
> > I agree that it would be nice to get rid of the tunable for eviction
> > time.  However, I'm concerned that if we do, we might run into cache
> > thrashing.  For example, if we have N cache slots and N+1 clients that
> > are all fetching continuously, we might have to evict a client on every
> > single fetch.  It would be much better to give a cache slot to N clients
> > and let the last client do full fetch requests.
> >
> > Perhaps we could mitigate this problem by evicting the smallest fetch
> > session-- the one that is for the smallest number of partitions.  This
> > would allow "big" clients that fetch many partitions (e.g. MirrorMaker)
> > to get priority.  But then you run into the problem where someone
> > fetches a huge number of partitions, and then goes away for a long time,
> > and you never reuse that cache memory.
> >
> > How about this approach?  We have a tunable for minimum eviction time
> > (default 2 minutes).  We cannot evict a client before this timeout has
> > expired.  We also have a tunable for total number of cache slots.  We
> > never cache more than this number of incremental fetch sessions.
> >
> > Sessions become eligible for eviction after 2 minutes, whether or not
> > the session is active.
> > Fetch Request A will evict Fetch Request B if and only if:
> > 1. A has been active in the last 2 minutes and B has not, OR
> > 2. A was made by a follower and B was made by a consumer, OR
> > 3. A has more partitions than B, OR
> > 4. A is newer than B
> >
> > Then, in a setup where consumers are fetching different numbers of
> > partitions, we will eventually converge on giving incremental fetch
> > sessions to the big consumers, and not to the small consumers.  In a
> > setup where consumers are all of equal size but the cache is too small
> > for all of them, we still thrash, but slowly.  Nobody can be evicted
> > before their 2 minutes are up.  So in 

[jira] [Created] (KAFKA-6312) Add documentation about kafka-consumer-groups.sh's ability to set/change offsets

2017-12-05 Thread James Cheng (JIRA)
James Cheng created KAFKA-6312:
--

 Summary: Add documentation about kafka-consumer-groups.sh's 
ability to set/change offsets
 Key: KAFKA-6312
 URL: https://issues.apache.org/jira/browse/KAFKA-6312
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Reporter: James Cheng


KIP-122 added the ability for kafka-consumer-groups.sh to reset/change consumer 
offsets, at a fine grained level.

There is documentation on it in the kafka-consumer-groups.sh usage text. 

There is no such documentation on the kafka.apache.org website. We should add 
some documentation to the website, so that users can read about the 
functionality without having the tools installed.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Konstantin Chukhlomin
Hi Dong,

Currently we are storing historical timestamp in the message.

What we are trying to achieve is to make it possible to do Kafka lookup 
by timestamp. Ideally I would do `offsetsForTimes` to find articles published 
in 1910s (if we are storing articles on the log).

So first two suggestions aren't really covering our use-case.

We could create a new timestamp type like "HistoricalTimestamp" or 
"MaybeNegativeTimestamp".
And the only difference between this one and CreateTime is that it could be 
negative.
I tend to use CreateTime for this purpose because it's easier to understand 
from 
user perspective as a timestamp which publisher can set.

Thanks,
Konstantin

> On Dec 5, 2017, at 3:47 PM, Dong Lin  wrote:
> 
> Hey Konstantin,
> 
> Thanks for the KIP. I have a few questions below.
> 
> Strictly speaking Kafka actually allows you to store historical data. And
> user are free to encode arbitrary timestamp field in their Kafka message.
> For example, your Kafka message can currently have Json or Avro format and
> you can put a timestamp field there. Do you think that could address your
> use-case?
> 
> Alternatively, KIP-82 introduced Record Header in Kafka and you can also
> define your customized key/value pair in the header. Do you think this can
> address your use-case?
> 
> Also, currently there are two types of timestamp according to KIP-32. If
> the type is LogAppendTime then the timestamp value is the time when broker
> receives the message. If the type is CreateTime then the timestamp value is
> determined when producer produces message. With these two definitions, the
> timestamp should always be positive. We probably need a new type here if we
> can not put timestamp in the Record Header or the message payload. Does
> this sound reasonable?
> 
> Thanks,
> Dong
> 
> 
> 
> On Tue, Dec 5, 2017 at 8:40 AM, Konstantin Chukhlomin 
> wrote:
> 
>> Hi all,
>> 
>> I have created a KIP to support negative timestamp:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 228+Negative+record+timestamp+support > confluence/display/KAFKA/KIP-228+Negative+record+timestamp+support>
>> 
>> Here are proposed changes: https://github.com/apache/
>> kafka/compare/trunk...chuhlomin:trunk > kafka/compare/trunk...chuhlomin:trunk>
>> 
>> I'm pretty sure that not cases are covered, so comments and suggestions
>> are welcome.
>> 
>> Thank you,
>> Konstantin



Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-05 Thread Jan Filipiak

Hi Colin

Addressing the topic of how to manage slots from the other thread.
With tcp connections all this comes for free essentially.
I still would argue we disable it by default and make a flag in the broker
to ask the leader to maintain the cache while replicating and also only
have it optional in consumers (default to off) so one can turn it on 
where it really hurts.

MirrorMaker and audit consumers prominently.

Otherwise I left a few remarks in-line, which should help to understand
my view of the situation better

Best Jan


On 05.12.2017 08:06, Colin McCabe wrote:

On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:


On 03.12.2017 21:55, Colin McCabe wrote:

On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:

Thanks for the explanation, Colin. A few more questions.


The session epoch is not complex.  It's just a number which increments
on each incremental fetch.  The session epoch is also useful for
debugging-- it allows you to match up requests and responses when
looking at log files.

Currently each request in Kafka has a correlation id to help match the
requests and responses. Is epoch doing something differently?

Hi Becket,

The correlation ID is used within a single TCP session, to uniquely
associate a request with a response.  The correlation ID is not unique
(and has no meaning) outside the context of that single TCP session.

Keep in mind, NetworkClient is in charge of TCP sessions, and generally
tries to hide that information from the upper layers of the code.  So
when you submit a request to NetworkClient, you don't know if that
request creates a TCP session, or reuses an existing one.

Unfortunately, this doesn't work.  Imagine the client misses an
increment fetch response about a partition.  And then the partition is
never updated after that.  The client has no way to know about the
partition, since it won't be included in any future incremental fetch
responses.  And there are no offsets to compare, since the partition is
simply omitted from the response.

I am curious about in which situation would the follower miss a response
of a partition. If the entire FetchResponse is lost (e.g. timeout), the
follower would disconnect and retry. That will result in sending a full
FetchRequest.

Basically, you are proposing that we rely on TCP for reliable delivery
in a distributed system.  That isn't a good idea for a bunch of
different reasons.  First of all, TCP timeouts tend to be very long.  So
if the TCP session timing out is your error detection mechanism, you
have to wait minutes for messages to timeout.  Of course, we add a
timeout on top of that after which we declare the connection bad and
manually close it.  But just because the session is closed on one end
doesn't mean that the other end knows that it is closed.  So the leader
may have to wait quite a long time before TCP decides that yes,
connection X from the follower is dead and not coming back, even though
gremlins ate the FIN packet which the follower attempted to translate.
If the cache state is tied to that TCP session, we have to keep that
cache around for a much longer time than we should.

Hi,

I see this from a different perspective. The cache expiry time
has the same semantic as idle connection time in this scenario.
It is the time range we expect the client to come back an reuse
its broker side state. I would argue that on close we would get an
extra shot at cleaning up the session state early. As opposed to
always wait for that duration for expiry to happen.

Hi Jan,

The idea here is that the incremental fetch cache expiry time can be
much shorter than the TCP session timeout.  In general the TCP session
timeout is common to all TCP connections, and very long.  To make these
numbers a little more concrete, the TCP session timeout is often
configured to be 2 hours on Linux.  (See
https://www.cyberciti.biz/tips/linux-increasing-or-decreasing-tcp-sockets-timeouts.html
)  The timeout I was proposing for incremental fetch sessions was one or
two minutes at most.

Currently this is taken care of by
connections.max.idle.ms on the broker and defaults to something of few 
minutes.

Also something we could let the client change if we really wanted to.
So there is no need to worry about coupling our implementation to some 
timeouts

given by the OS, with TCP one always has full control over the worst times +
one gets the extra shot cleaning up early when the close comes through. 
Which

is the majority of the cases.




Secondly, from a software engineering perspective, it's not a good idea
to try to tightly tie together TCP and our code.  We would have to
rework how we interact with NetworkClient so that we are aware of things
like TCP sessions closing or opening.  We would have to be careful
preserve the ordering of incoming messages when doing things like
putting incoming requests on to a queue to be processed by multiple
threads.  It's just a lot of complexity to add, and there's no upside.

I see the point here. And I had a 

Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Dong Lin
Hey Konstantin,

Thanks for the KIP. I have a few questions below.

Strictly speaking Kafka actually allows you to store historical data. And
user are free to encode arbitrary timestamp field in their Kafka message.
For example, your Kafka message can currently have Json or Avro format and
you can put a timestamp field there. Do you think that could address your
use-case?

Alternatively, KIP-82 introduced Record Header in Kafka and you can also
define your customized key/value pair in the header. Do you think this can
address your use-case?

Also, currently there are two types of timestamp according to KIP-32. If
the type is LogAppendTime then the timestamp value is the time when broker
receives the message. If the type is CreateTime then the timestamp value is
determined when producer produces message. With these two definitions, the
timestamp should always be positive. We probably need a new type here if we
can not put timestamp in the Record Header or the message payload. Does
this sound reasonable?

Thanks,
Dong



On Tue, Dec 5, 2017 at 8:40 AM, Konstantin Chukhlomin 
wrote:

> Hi all,
>
> I have created a KIP to support negative timestamp:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 228+Negative+record+timestamp+support  confluence/display/KAFKA/KIP-228+Negative+record+timestamp+support>
>
> Here are proposed changes: https://github.com/apache/
> kafka/compare/trunk...chuhlomin:trunk  kafka/compare/trunk...chuhlomin:trunk>
>
> I'm pretty sure that not cases are covered, so comments and suggestions
> are welcome.
>
> Thank you,
> Konstantin


Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Konstantin Chukhlomin
Yes, the point of that new class is to return same timestamp no mater if it's 
negative or not.
And 5000 BC would be a valid timestamp.
But I haven't tried to use streams with such historical data yet.

> On Dec 5, 2017, at 3:02 PM, Ted Yu  wrote:
> 
> In the diff you gave, onInvalidTimestamp() doesn't have any check.
> 
> What if the timestamp corresponds to 5000 BC ? Is that still allowed ?
> 
> Cheers
> 
> On Tue, Dec 5, 2017 at 10:29 AM, Konstantin Chukhlomin 
> wrote:
> 
>> Hi Ted,
>> 
>> Thank you for the response.
>> I made a relevant changes to the KIP.
>> 
>>> On Dec 5, 2017, at 11:59 AM, Ted Yu  wrote:
>>> 
>>> In KeepTimestampOnInvalidTimestamp, there should be check that
>> timestamp is
>>> < 0.
>>> This would protect against future change to onInvalidTimestamp()
>> callback.
>> 
>> Not quite follow here, could you tell more?
>> 
>>> Wednesday, December 31, 1969 11:59:59 PM UTC was in the past. Can you
>>> enrich Motivation section on why the proposal is made (writing data
>>> generated nowadays wouldn't result in negative timestamp)?
>>> 
>>> In Compatibility section, there are two questions without answers.
>>> Are you going to fill out later ?
>>> 
>>> Cheers
>>> 
>>> On Tue, Dec 5, 2017 at 8:40 AM, Konstantin Chukhlomin <
>> chuhlo...@gmail.com>
>>> wrote:
>>> 
 Hi all,
 
 I have created a KIP to support negative timestamp:
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-
 228+Negative+record+timestamp+support 
 
 Here are proposed changes: https://github.com/apache/
 kafka/compare/trunk...chuhlomin:trunk 
 
 I'm pretty sure that not cases are covered, so comments and suggestions
 are welcome.
 
 Thank you,
 Konstantin
>> 
>> 



Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Ted Yu
In the diff you gave, onInvalidTimestamp() doesn't have any check.

What if the timestamp corresponds to 5000 BC ? Is that still allowed ?

Cheers

On Tue, Dec 5, 2017 at 10:29 AM, Konstantin Chukhlomin 
wrote:

> Hi Ted,
>
> Thank you for the response.
> I made a relevant changes to the KIP.
>
> > On Dec 5, 2017, at 11:59 AM, Ted Yu  wrote:
> >
> > In KeepTimestampOnInvalidTimestamp, there should be check that
> timestamp is
> > < 0.
> > This would protect against future change to onInvalidTimestamp()
> callback.
>
> Not quite follow here, could you tell more?
>
> > Wednesday, December 31, 1969 11:59:59 PM UTC was in the past. Can you
> > enrich Motivation section on why the proposal is made (writing data
> > generated nowadays wouldn't result in negative timestamp)?
> >
> > In Compatibility section, there are two questions without answers.
> > Are you going to fill out later ?
> >
> > Cheers
> >
> > On Tue, Dec 5, 2017 at 8:40 AM, Konstantin Chukhlomin <
> chuhlo...@gmail.com>
> > wrote:
> >
> >> Hi all,
> >>
> >> I have created a KIP to support negative timestamp:
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 228+Negative+record+timestamp+support  >> confluence/display/KAFKA/KIP-228+Negative+record+timestamp+support>
> >>
> >> Here are proposed changes: https://github.com/apache/
> >> kafka/compare/trunk...chuhlomin:trunk  >> kafka/compare/trunk...chuhlomin:trunk>
> >>
> >> I'm pretty sure that not cases are covered, so comments and suggestions
> >> are welcome.
> >>
> >> Thank you,
> >> Konstantin
>
>


Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-05 Thread Ted Yu
bq. We also have a tunable for total number of cache slots. We never cache
more than this number of incremental fetch sessions.

Is it possible to manage the cache based on heap consumption instead of
number of slots ?
It seems heap estimation can be done by counting PartitionData (along with
overhead for related Map structure).

Cheers

On Tue, Dec 5, 2017 at 11:02 AM, Colin McCabe  wrote:

> On Tue, Dec 5, 2017, at 08:51, Jason Gustafson wrote:
> > Hi Colin,
> >
> > Thanks for the response. A couple replies:
> >
> >
> > > I’m a bit ambivalent about letting the client choose the session
> > > timeout.  What if clients choose timeouts that are too long? Hmm
> > > I do agree the timeout should be sized proportional to
> > > max.poll.interval.ms.
> >
> >
> > We have solved this in other cases by letting the broker enforce a
> > maximum timeout. After thinking about it a bit, it's probably overkill
> in this
> > case since the caching is just an optimization. Instead of stressing over
> > timeouts and such, I am actually wondering if we just need a reasonable
> > session cache eviction policy. For example, when the number of slots is
> > exceeded, perhaps you evict the session with the fewest partitions or the
> > one with the largest interval between fetches. We could give priority to
> > the replicas. Perhaps this might let us get rid of a few of the configs.
>
> I agree that it would be nice to get rid of the tunable for eviction
> time.  However, I'm concerned that if we do, we might run into cache
> thrashing.  For example, if we have N cache slots and N+1 clients that
> are all fetching continuously, we might have to evict a client on every
> single fetch.  It would be much better to give a cache slot to N clients
> and let the last client do full fetch requests.
>
> Perhaps we could mitigate this problem by evicting the smallest fetch
> session-- the one that is for the smallest number of partitions.  This
> would allow "big" clients that fetch many partitions (e.g. MirrorMaker)
> to get priority.  But then you run into the problem where someone
> fetches a huge number of partitions, and then goes away for a long time,
> and you never reuse that cache memory.
>
> How about this approach?  We have a tunable for minimum eviction time
> (default 2 minutes).  We cannot evict a client before this timeout has
> expired.  We also have a tunable for total number of cache slots.  We
> never cache more than this number of incremental fetch sessions.
>
> Sessions become eligible for eviction after 2 minutes, whether or not
> the session is active.
> Fetch Request A will evict Fetch Request B if and only if:
> 1. A has been active in the last 2 minutes and B has not, OR
> 2. A was made by a follower and B was made by a consumer, OR
> 3. A has more partitions than B, OR
> 4. A is newer than B
>
> Then, in a setup where consumers are fetching different numbers of
> partitions, we will eventually converge on giving incremental fetch
> sessions to the big consumers, and not to the small consumers.  In a
> setup where consumers are all of equal size but the cache is too small
> for all of them, we still thrash, but slowly.  Nobody can be evicted
> before their 2 minutes are up.  So in general, the overhead of the extra
> full requests is still low.  If someone makes a big request and then
> shuts down, they get cleaned up after 2 minutes, because of condition
> #1.  And there are only two tunables needed: cache size and eviction
> time.
>
> >
> > The main reason is if there is a bug in the incremental fetch feature.
> > >
> >
> > This was in response to my question about removing the consumer config.
> > And sure, any new feature may have bugs, but that's what we have testing
> for
> > ;). Users can always fall back to a previous version if there are any
> > major problems. As you know, it's tough removing configs once they are
> there,
> > so I think we should try to add them only if they make sense in the long
> > term.
>
> That's a fair point.  I guess if we do need to disable incremental
> fetches in production because of a bug, we can modify the broker
> configuration to do so (by setting 0 cache slots).
>
> best,
> Colin
>
> >
> > Thanks,
> > Jason
> >
> > On Mon, Dec 4, 2017 at 11:06 PM, Colin McCabe 
> wrote:
> >
> > > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > > >
> > > >
> > > > On 03.12.2017 21:55, Colin McCabe wrote:
> > > > > On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > > > >> Thanks for the explanation, Colin. A few more questions.
> > > > >>
> > > > >>> The session epoch is not complex.  It's just a number which
> > > increments
> > > > >>> on each incremental fetch.  The session epoch is also useful for
> > > > >>> debugging-- it allows you to match up requests and responses when
> > > > >>> looking at log files.
> > > > >> Currently each request in Kafka has a correlation id to help
> match the
> > > > >> requests and responses. Is epoch 

Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2017-12-05 Thread Colin McCabe
On Tue, Dec 5, 2017, at 06:01, Rajini Sivaram wrote:
> Hi Colin,
> 
> KAFKA-5722 already has an owner, so I didn't want to confuse the two
> KIPs.  They can be done independently of each other. The goal is to try and
> validate every config to the minimum validation already in the broker for
> the static configs, but in some cases to a more restrictive level. So a
> typo like a file-not-found or class-not-found would definitely fail the
> AlterConfigs request (validation is performed by AlterConfigs regardless
> of validateOnly flag). I am working out the additional validation I can
> perform as I implement updates for each config. For example,
> inter-broker keystore update will not be allowed unless it can be
> verified against the currently configured truststore.

HI Rajini,

I agree.  It's probably better to avoid expanding the scope of KIP-226. 
I hope we can get to KAFKA-5722 soon, though, since it will really
improve the user experience for this feature.

regards,
Colin

> 
> On Sat, Dec 2, 2017 at 10:15 PM, Colin McCabe  wrote:
> 
> > On Tue, Nov 28, 2017, at 14:48, Rajini Sivaram wrote:
> > > Hi Colin,
> > >
> > > Thank you for reviewing the KIP.
> > >
> > > *kaka-configs.sh* will be converted to use *AdminClient* under
> > > KAFKA-5722.
> > > This is targeted for the next release (1.1.0). Under this KIP, we will
> > > implement *AdminClient#alterConfigs* for the dynamic configs listed in
> > > the KIP. This will include validation of the configs and will return
> > > appropriate errors if configs are invalid. Integration tests will also be
> > > added using AdmnClient. Only the actual conversion of ConfigCommand to
> > > use AdminClient will be left to be done under KAFKA-5722.
> >
> > Hi Rajini,
> >
> > It seems like there is no KIP yet for KAFKA-5722.  Does it make sense to
> > describe the conversion of kafka-configs.sh to use AdminClient in
> > KIP-226?  Since the AlterConfigs RPCs already exist, it should be pretty
> > straightforward.  This would also let us add some information about how
> > errors will be handled, which is pretty important for users.  For
> > example, will kafka-configs.sh give an error if the user makes a typo
> > when setting a configuration?
> >
> > >
> > > Once KAFKA-5722 is implemented,* kafka-confgs.sh* can be used to obtain
> > > the current configuration, which can be redirected to a text file to
> > create a
> > > static *server.properties* file. This should help when downgrading - but
> > > it does require brokers to be running. We can also document how to obtain
> > > the properties using *zookeeper-shell.sh* while downgrading if brokers
> > are
> > > down.
> > >
> > > If we rename properties, we should add the new property to ZK based on
> > > the value of the old property when the upgraded broker starts up. But we
> > > would probably leave the old property as is. The old property will be
> > returned
> > > and used as a synonym only as long as the broker is on a version where it
> > > is still valid. But it can remain in ZK and be updated if downgrading -
> > > it will be up to the user to update the old property if downgrading or
> > > delete it if not needed. Renaming properties is likely to be confusing
> > in any
> > > case even without dynamic configs, so hopefully it will be very rare.
> >
> > Sounds good.
> >
> > best,
> > Colin
> >
> > >
> > >
> > > Rajini
> > >
> > > On Tue, Nov 28, 2017 at 7:47 PM, Colin McCabe 
> > wrote:
> > >
> > > > Hi Rajini,
> > > >
> > > > This seems like a nice improvement!
> > > >
> > > > One thing that is a bit concerning is that, if bin/kafka-configs.sh is
> > > > used, there is no  way for the broker to give feedback or error
> > > > messages.  The broker can't say "sorry, I can't reconfigure that in
> > that
> > > > way."  Or even "that configuration property is not reconfigurable in
> > > > this version of the software."  It seems like in the examples give,
> > > > users will simply set a configuration using bin/kafka-configs.sh, but
> > > > then they have to check the broker log files to see if it could
> > actually
> > > > be applied.  And even if it couldn't be applied, then it still lingers
> > > > in ZooKeeper.
> > > >
> > > > This seems like it would lead to a lot of user confusion, since they
> > get
> > > > no feedback when reconfiguring something.  For example, there will be a
> > > > lot of scenarios where someone finds a reconfiguration command on
> > > > Google, runs it, but then it doesn't do anything because the software
> > > > version is different.  And there's no error message or feedback about
> > > > this.  It just doesn't work.
> > > >
> > > > To prevent this, I think we should convert bin/kafka-configs.sh to use
> > > > AdminClient's AlterConfigsRequest.  This allows us to detect scenarios
> > > > where, because of a typo, different software version, or a value of the
> > > > wrong type (eg. string vs. int), the given configuration cannot be
> > > > 

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-05 Thread Colin McCabe
On Tue, Dec 5, 2017, at 08:51, Jason Gustafson wrote:
> Hi Colin,
> 
> Thanks for the response. A couple replies:
> 
> 
> > I’m a bit ambivalent about letting the client choose the session
> > timeout.  What if clients choose timeouts that are too long? Hmm
> > I do agree the timeout should be sized proportional to
> > max.poll.interval.ms.
> 
> 
> We have solved this in other cases by letting the broker enforce a
> maximum timeout. After thinking about it a bit, it's probably overkill in this
> case since the caching is just an optimization. Instead of stressing over
> timeouts and such, I am actually wondering if we just need a reasonable
> session cache eviction policy. For example, when the number of slots is
> exceeded, perhaps you evict the session with the fewest partitions or the
> one with the largest interval between fetches. We could give priority to
> the replicas. Perhaps this might let us get rid of a few of the configs.

I agree that it would be nice to get rid of the tunable for eviction
time.  However, I'm concerned that if we do, we might run into cache
thrashing.  For example, if we have N cache slots and N+1 clients that
are all fetching continuously, we might have to evict a client on every
single fetch.  It would be much better to give a cache slot to N clients
and let the last client do full fetch requests.

Perhaps we could mitigate this problem by evicting the smallest fetch
session-- the one that is for the smallest number of partitions.  This
would allow "big" clients that fetch many partitions (e.g. MirrorMaker)
to get priority.  But then you run into the problem where someone
fetches a huge number of partitions, and then goes away for a long time,
and you never reuse that cache memory.

How about this approach?  We have a tunable for minimum eviction time
(default 2 minutes).  We cannot evict a client before this timeout has
expired.  We also have a tunable for total number of cache slots.  We
never cache more than this number of incremental fetch sessions.

Sessions become eligible for eviction after 2 minutes, whether or not
the session is active.
Fetch Request A will evict Fetch Request B if and only if:
1. A has been active in the last 2 minutes and B has not, OR
2. A was made by a follower and B was made by a consumer, OR
3. A has more partitions than B, OR
4. A is newer than B

Then, in a setup where consumers are fetching different numbers of
partitions, we will eventually converge on giving incremental fetch
sessions to the big consumers, and not to the small consumers.  In a
setup where consumers are all of equal size but the cache is too small
for all of them, we still thrash, but slowly.  Nobody can be evicted
before their 2 minutes are up.  So in general, the overhead of the extra
full requests is still low.  If someone makes a big request and then
shuts down, they get cleaned up after 2 minutes, because of condition
#1.  And there are only two tunables needed: cache size and eviction
time.

> 
> The main reason is if there is a bug in the incremental fetch feature.
> >
> 
> This was in response to my question about removing the consumer config.
> And sure, any new feature may have bugs, but that's what we have testing for
> ;). Users can always fall back to a previous version if there are any
> major problems. As you know, it's tough removing configs once they are there,
> so I think we should try to add them only if they make sense in the long
> term.

That's a fair point.  I guess if we do need to disable incremental
fetches in production because of a bug, we can modify the broker
configuration to do so (by setting 0 cache slots).

best,
Colin

> 
> Thanks,
> Jason
> 
> On Mon, Dec 4, 2017 at 11:06 PM, Colin McCabe  wrote:
> 
> > On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> > >
> > >
> > > On 03.12.2017 21:55, Colin McCabe wrote:
> > > > On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > > >> Thanks for the explanation, Colin. A few more questions.
> > > >>
> > > >>> The session epoch is not complex.  It's just a number which
> > increments
> > > >>> on each incremental fetch.  The session epoch is also useful for
> > > >>> debugging-- it allows you to match up requests and responses when
> > > >>> looking at log files.
> > > >> Currently each request in Kafka has a correlation id to help match the
> > > >> requests and responses. Is epoch doing something differently?
> > > > Hi Becket,
> > > >
> > > > The correlation ID is used within a single TCP session, to uniquely
> > > > associate a request with a response.  The correlation ID is not unique
> > > > (and has no meaning) outside the context of that single TCP session.
> > > >
> > > > Keep in mind, NetworkClient is in charge of TCP sessions, and generally
> > > > tries to hide that information from the upper layers of the code.  So
> > > > when you submit a request to NetworkClient, you don't know if that
> > > > request creates a TCP session, or reuses an existing one.
> > > 

[jira] [Created] (KAFKA-6311) Expose Kafka cluster ID in Connect REST API

2017-12-05 Thread JIRA
Xavier Léauté created KAFKA-6311:


 Summary: Expose Kafka cluster ID in Connect REST API
 Key: KAFKA-6311
 URL: https://issues.apache.org/jira/browse/KAFKA-6311
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Xavier Léauté
Assignee: Ewen Cheslack-Postava


Connect currently does not expose any information about the Kafka cluster it is 
connected to.
In an environment with multiple Kafka clusters it would be useful to know which 
cluster Connect is talking to. Exposing this information enables programmatic 
discovery of Kafka cluster metadata for the purpose of configuring connectors.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Comparing Pulsar and Kafka: unified queuing and streaming

2017-12-05 Thread Jason Gustafson
> I believe a lot of users are using the kafka high level consumers, it is
> effectively an **unordered** messaging/streaming pattern. People using high
> level consumers don't actually need any ordering guarantees. In this sense,
> a *shared* subscription in Apache Pulsar seems to be better than current
> Kafka's consumer group model, as it allows the consumption rate not limited
> by the number of partitions, can actually grow beyond the number of
> partitions. We do see a lot of operational pain points on production coming
> from consumer lags, which I think it is very commonly seen during partition
> rebalancing in a consumer group. Selective acking seems to provide a finer
> granularity on acknowledgment, which can be actually good for avoiding
> consumer lags and avoid reprocessing messages during partition rebalance.


Yeah, I'm not sure about this. I'd be interested to understand the design
of this feature a little better. In practice, when ordering is unimportant,
adding partitions seems not too big of a deal. Also, I'm aware of active
efforts to make rebalancing less of a pain point for our users ;)

The last question, from users perspective, since both kafka and pulsar are
> distributed pub/sub messaging systems and both of them at the ASF, is there
> any possibility for these two projects to collaborate, e.g. kafka adopts
> pulsar's messaging model, pulsar can use kafka streams and kafka connect. I
> believe a lot of people in the mailing list might have same or similar
> question. From end-user perspective, if such collaboration can happen, that
> is going to great for users and also the ASF. I would like to hear any
> thoughts from kafka committers and pmc members.


I see this a little differently. Although there is some overlap between the
projects, they have quite different underlying philosophies (as Marina
alluded to) and I hope this will take them on different trajectories over
time. That would ultimately benefit users more than having two competing
projects solving all the same use cases. We don't need to try to cram
Pulsar features into Kafka if it's not a good fit and vice versa. At the
same time, where capabilities do overlap, we can try to learn from their
experience and they can learn from ours. The example of message retention
seemed like one of these instances since there are legitimate use cases and
Pulsar's approach has some benefits.


-Jason



On Tue, Dec 5, 2017 at 9:57 AM, Khurrum Nasim 
wrote:

> Hi Marina,
>
>
> On Tue, Dec 5, 2017 at 6:58 AM, Marina Popova 
> wrote:
>
> > Hi,
> > I don't think it would be such a great idea to start modifying the very
> > foundation of Kafka's design to accommodate more and more extra use
> cases.
> > Kafka because so widely adopted and popular because its creator made a
> > brilliant decision to make it "dumb broker - smart consumer" type of the
> > system, where there is no to minimal dependencies between Kafka brokers
> and
> > Consumers. This is what make Kafka blazingly fast and truly scalable -
> able
> > to handle thousands of Consumers with no impact on performance.
> >
>
> I am not sure I agree with this. I think from end-user perspective, what
> users expect is a ultra simple streaming/messaging system: applications
> sends messages, messaging systems store and dispatch them, consumers
> consume the messages and tell the systems that they already consumed the
> messages. IMO whether a centralized management or decentralize management
> doesn't really matter here if kafka is able to do things without impacting
> performance.
>
> sometimes people assume that smarter brokers (like traditional messaging
> brokers) can not offer high throughput and scalability, because they do
> "too many things". but I took a look at Pulsar documentation and their
> presentation. There are a few metrics very impressive:
>
> https://image.slidesharecdn.com/apachepulsar-171113225233/
> 95/bdam-multitenant-and-georeplication-messaging-with-
> apache-pulsar-by-matteo-merli-sijie-guo-from-streamlio-2-
> 638.jpg?cb=1510613990
>
>  95/bdam-multitenant-and-georeplication-messaging-with-
> apache-pulsar-by-matteo-merli-sijie-guo-from-streamlio-2-
> 638.jpg?cb=1510613990>-
> 1.8 million messages/second per topic partition
> - 99pct producing latency less than 5ms with stronger durability
> - support millions of topics
> - it also supports at-least-once and effectively-once producing
>
> Those metrics sound appealing to me if pulsar supports both streaming and
> queuing. I am wondering if anyone in the community tries to do a
> performance testing or benchmark between Pulsar and Kafka. I would love to
> see such results that can help people understand both systems, pros and
> cons.
>
>
> - KN
>
>
>
> >
> > One unfortunate consequence of becoming so popular - is that more and
> more
> > people are trying to fit Kafka into their architectures not 

Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Konstantin Chukhlomin
Hi Ted, 

Thank you for the response.
I made a relevant changes to the KIP.

> On Dec 5, 2017, at 11:59 AM, Ted Yu  wrote:
> 
> In KeepTimestampOnInvalidTimestamp, there should be check that timestamp is
> < 0.
> This would protect against future change to onInvalidTimestamp() callback.

Not quite follow here, could you tell more?

> Wednesday, December 31, 1969 11:59:59 PM UTC was in the past. Can you
> enrich Motivation section on why the proposal is made (writing data
> generated nowadays wouldn't result in negative timestamp)?
> 
> In Compatibility section, there are two questions without answers.
> Are you going to fill out later ?
> 
> Cheers
> 
> On Tue, Dec 5, 2017 at 8:40 AM, Konstantin Chukhlomin 
> wrote:
> 
>> Hi all,
>> 
>> I have created a KIP to support negative timestamp:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 228+Negative+record+timestamp+support > confluence/display/KAFKA/KIP-228+Negative+record+timestamp+support>
>> 
>> Here are proposed changes: https://github.com/apache/
>> kafka/compare/trunk...chuhlomin:trunk > kafka/compare/trunk...chuhlomin:trunk>
>> 
>> I'm pretty sure that not cases are covered, so comments and suggestions
>> are welcome.
>> 
>> Thank you,
>> Konstantin



Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-05 Thread Colin McCabe
On Sun, Dec 3, 2017, at 16:28, Becket Qin wrote:
> >The correlation ID is used within a single TCP session, to uniquely
> >associate a request with a response.  The correlation ID is not unique
> >(and has no meaning) outside the context of that single TCP session.
> >
> >Keep in mind, NetworkClient is in charge of TCP sessions, and generally
> >tries to hide that information from the upper layers of the code.  So
> >when you submit a request to NetworkClient, you don't know if that
> >request creates a TCP session, or reuses an existing one.
> 
> Hmm, the correlation id is an application level information in each Kafka
> request. It is maintained by o.a.k.c.NetworkClient. It is not associated
> with TCP sessions. So even the TCP session disconnects and reconnects,
> the correlation id is not reset and will still be monotonically increasing.

Hi Becket,

That's a fair point.  I was thinking of previous RPC systems I worked
with.  But in Kafka, you're right that the correlation ID is maintained
by a single counter in NetworkClient, rather than being a counter
per-connection.

In any case, the correlation ID is there in order to associate a request
with a response within a single TCP session.  It's not unique, even on a
single node, if there is more than one NetworkClient.  It will get reset
to 0 any time we restart the process or re-create the NetworkClient
object.

> 
> Maybe I did not make it clear. I am not suggesting anything relying on
> TCP or transport layer. Everything is handled at application layer. From the
> clients perspective, the timeout is not defined as TCP timeout, it is
> defined as the upper bound of time it will wait before receiving a
> response. If the client did not receive a response before the timeout is
> reached, it will just retry. My suggestion was that as long as a
> FetchRequest needs to be retried, no matter for what reason, we just use
> a full FetchRequest. This does not depend on NetworkClient implementations,
> i.e. regardless of whether the retry is on the existing TCP connection or
> a new one.

So, with this proposal, if the TCP session drops, then the client needs
to retransmit, right?  That's why I said this proposal couples the TCP
session with the incremental fetch session.  In general, I don't see why
you would want to couple those two things.

If the network is under heavy load, it might cause a few TCP sessions to
drop.  If a dropped TCP session means that someone has to fall back to
sending a much larger full fetch request, that's a positive feedback
loop.  It could lead to congestion collapse.

In general, I think that the current KIP proposal, which allows an
incremental fetch session to persist across multiple TCP sessions, is
superior to a proposal which doesn't allow that.  It also avoids
worrying about message reordering within the server due to multiple
worker threads and delayed requests.  It's just simpler, easier, and
more efficient to have the sequence number than to not have it.

> 
> The question we are trying to answer here is essentially how to let the
> leader and followers agree on the messages in the log. And we are
> comparing
> the following two solutions:
> 1. Use something like a TCP ACK with epoch at Request/Response level.
> 2. Piggy back the leader knowledge at partition level for the follower to
> confirm.

The existing KIP proposal is not really similar to a TCP ACK.  A TCP ACK
involves sending back an actual ACK packet.  The KIP-227 proposal just
has an incrementing sequence number which the client increments each
time it successfully receives a response.

> 
> Personally I think (2) is better because (2) is more direct. The leader
> is the one who maintains all the state (LEOs) of the followers. At the end
> of the day, the leader just wants to make sure all those states are correct.
> (2) directly confirms those states with the followers instead of
> inferring that from a epoch.

The problem is that when using incremental updates, we can't "directly
confirm" that the follower and the leader are in sync.  For example,
suppose the follower loses a response which gives an update for some
partition.  Then, the partition is not changed after that.  The follower
has no way of knowing that that data is missing, just by looking at the
responses.  That's why it is so important to keep the follower and the
leader in lockstep by using the sequence number.

> Note that there is a subtle but maybe important
> difference between our use case of epoch and TCP seq. The difference is
> that a TCP ACK confirms all the packets with a lower seq has been
> received.
> In our case, a high epoch request does not mean all the data in the
> previous response was successful. So in the KIP, the statement of "When
> the leader receives a fetch request with epoch N + 1, it knows that the data
> it sent back for the fetch request with epoch N was successfully processed
> by the follower." could be tricky or expensive to make right in some cases.

Hmm, let me 

Re: [VOTE] KIP-179: Change ReassignPartitionsCommand to use AdminClient

2017-12-05 Thread Tom Bentley
I am withdrawing this KIP. See the [DISCUSS] thread for the reasons why.

On 7 November 2017 at 08:15, Tom Bentley  wrote:

> Hi,
>
> I would like to start a vote on KIP-179 which would add an AdminClient API
> for partition reassignment and interbroker replication throttling.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+Change+
> ReassignPartitionsCommand+to+use+AdminClient
>
> Thanks,
>
> Tom
>


Re: [DISCUSS] KIP-179: Change ReassignPartitionsCommand to use AdminClient

2017-12-05 Thread Tom Bentley
Hi all,

I've been thinking about the proposed changes in KIP-179 and, on reflection,
I don't think the API presented is really ideal. Some of the limitations it
has include:

1. It sticks to the current, batch oriented (i.e. a single set of
reassignments at a time), model.
2. It still doesn't really provide a nice way of knowing that a
reassignment is complete.
3. As presented, the automatic removal of throttles only happens at the end
of
the reassignment batch. But individual brokers could be unthrottled before
then.

As an illustration of this, https://issues.apache.org/jira/browse/KAFKA-6304
provides a use case for wanting to cancel a reassignment because one of the
brokers in the new assignment has failed. With the proposed API:

1. We can't identify the subset of the reassignment batch which we want to
cancel.
2. All we could do would be to revise the proposed API to allow calling
   reassignPartitions() while a reassignment was in progress. This second
call
   could revert the subset of reassignments involving the failed broker.
3. But the API has no way to express that the original reassignment was
cancelled.

Another illustration of the problem: An advanced cluster balancer
(such as LinkedIn's cruise control) has to batch large reassignments
(partly so as to make cancellation easier). This batching itself leads in
inefficiency because some of the partitions in the batch will finish before
others, so time is wasted with the cluster only moving a small number of
partitions (when most in the batch have finished).

In hindsight, I think I was too influenced by reproducing what the
kafka-reassign-partitions.sh tool does today. I think what's actually
needed
(for things like cruise control) is an API that's more fine-grained, and
less batch oriented. I am therefore withdrawing KIP-179 and
intend to start a new KIP to propose a different API for partition
reassignment.

I'm still interested in hearing about other deficiencies of the KIP-179
proposal,
so I can avoid them in the new proposal. Similarly, if there are features
you'd like to see in the API, please let me know.

I won't go in to details of the new API here, but the basic idea I'd like
to use is to
give the reassignment of each partition an identity (though this wouldn't
be exposed directly in the API). This is necessary to allow new
reassignments
to be added while some are already running. API methods would then be
provided to discover
all the currently running reassignments, determine if a reassignment is
still running etc.

Cheers,

Tom

On 1 November 2017 at 10:20, Tom Bentley  wrote:

> This thread has been very quiet for a while now. It's unclear whether this
> is because no one has anything more to say, or whether no one has taken a
> look at it in its current form. I suspect the latter, so I'm not calling
> the vote today, but instead asking for more review.
>
> What's currently proposed – in addition to the reassignPartitions() API
> itself – is to have a pair of RPCs for managing throttles. This is quite
> different from the earlier proposal to reuse alterConfigs(). The benefits
> of this specific API include:
>
> * being more typesafe,
> * allowing for the automatic removal of throttles when reassignment has
> completed,
> * being careful about correct management of the throttles wrt controller
> failover
>
> Surely someone has something to say about this, before we reach the vote
> stage?
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+
> Change+ReassignPartitionsCommand+to+use+AdminClient
>
> Thanks,
>
> Tom
>
>
> On 25 October 2017 at 10:33, Tom Bentley  wrote:
>
>> If there are no further comments, I will start a vote on this next week.
>>
>> Thanks,
>>
>> Tom
>>
>> On 20 October 2017 at 08:33, Tom Bentley  wrote:
>>
>>> Hi,
>>>
>>> I've made a fairly major update to KIP-179 to propose APIs for setting
>>> throttled rates and throttled replicas with the ability to remove these
>>> automatically at the end of reassignment.
>>>
>>> I'd be grateful for your feedback:
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-179+-+
>>> Change+ReassignPartitionsCommand+to+use+AdminClient
>>>
>>> Thanks,
>>>
>>> Tom
>>>
>>> On 2 October 2017 at 13:15, Tom Bentley  wrote:
>>>
 One question I have is about whether/how to scope throttling to a
 reassignment. Currently throttles are only loosely associated with
 reassignment: You can start a reassignment without any throttling, add
 throttling to an in-flight reassignment, and remember/forget to remove
 throttling after the reassignment is complete. There's is great flexibility
 in that, but also the risk that you forget the remove the throttle(s).

 Just adding an API for setting the throttled rate makes this situation
 worse: While it's nice to be able to auto-remove the throttles rate what
 about the config for the throttled replicas? 

Re: [DISCUSS]KIP-235 DNS alias and secured connections

2017-12-05 Thread Tom Bentley
Hi Jonathan,

It might be worth mentioning in the KIP that this is necessary only for
*Kerberos* on SASL, and not other SASL mechanisms. Reading the JIRA it
makes sensem, but I was confused up until that point.

Cheers,

Tom

On 5 December 2017 at 17:53, Skrzypek, Jonathan 
wrote:

> Hi,
>
> I would like to discuss a KIP I've submitted :
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 235%3A+Add+DNS+alias+support+for+secured+connection
>
> Feedback and suggestions welcome !
>
> Regards,
> Jonathan Skrzypek
> Middleware Engineering
> Messaging Engineering
> Goldman Sachs International
> Christchurch Court - 10-15 Newgate Street
> London EC1A 7HD
> Tel: +442070512977
>
>


Re: Comparing Pulsar and Kafka: unified queuing and streaming

2017-12-05 Thread Khurrum Nasim
Hi Marina,


On Tue, Dec 5, 2017 at 6:58 AM, Marina Popova 
wrote:

> Hi,
> I don't think it would be such a great idea to start modifying the very
> foundation of Kafka's design to accommodate more and more extra use cases.
> Kafka because so widely adopted and popular because its creator made a
> brilliant decision to make it "dumb broker - smart consumer" type of the
> system, where there is no to minimal dependencies between Kafka brokers and
> Consumers. This is what make Kafka blazingly fast and truly scalable - able
> to handle thousands of Consumers with no impact on performance.
>

I am not sure I agree with this. I think from end-user perspective, what
users expect is a ultra simple streaming/messaging system: applications
sends messages, messaging systems store and dispatch them, consumers
consume the messages and tell the systems that they already consumed the
messages. IMO whether a centralized management or decentralize management
doesn't really matter here if kafka is able to do things without impacting
performance.

sometimes people assume that smarter brokers (like traditional messaging
brokers) can not offer high throughput and scalability, because they do
"too many things". but I took a look at Pulsar documentation and their
presentation. There are a few metrics very impressive:

https://image.slidesharecdn.com/apachepulsar-171113225233/95/bdam-multitenant-and-georeplication-messaging-with-apache-pulsar-by-matteo-merli-sijie-guo-from-streamlio-2-638.jpg?cb=1510613990

-
1.8 million messages/second per topic partition
- 99pct producing latency less than 5ms with stronger durability
- support millions of topics
- it also supports at-least-once and effectively-once producing

Those metrics sound appealing to me if pulsar supports both streaming and
queuing. I am wondering if anyone in the community tries to do a
performance testing or benchmark between Pulsar and Kafka. I would love to
see such results that can help people understand both systems, pros and
cons.


- KN



>
> One unfortunate consequence of becoming so popular - is that more and more
> people are trying to fit Kafka into their architectures not because it
> really fits, but because everybody else is doing so :) And this causes many
> requests to support more and more reacher functionality to be added to
> Kafka - like transactional messages, more complex acks, centralized
> consumer management, etc.
>
> If you really need those feature - there are other systems that are
> designed for that.
>
> I truly worry that if all those changes are added to Core Kafka - it will
> become just another "do it all" enterprise-level monster that will be able
> to do it all but at a price of mediocre performance and ten-fold increased
> complexity (and, thus, management and possibility of bugs). Sure, there has
> to be innovation and new features added - but maybe those that require
> major changes to the Kafka's core principles should go into separate
> frameworks, plug-ing (like Connectors) or something in that line, rather
> that packing it all into the Core Kafka.
>
> Just my 2 cents :)
>
> Marina
>
> Sent with [ProtonMail](https://protonmail.com) Secure Email.
>
> >  Original Message 
> > Subject: Re: Comparing Pulsar and Kafka: unified queuing and streaming
> > Local Time: December 4, 2017 2:56 PM
> > UTC Time: December 4, 2017 7:56 PM
> > From: ja...@confluent.io
> > To: dev@kafka.apache.org
> > Kafka Users 
> >
> > Hi Khurrum,
> >
> > Thanks for sharing the article. I think one interesting aspect of Pulsar
> > that stands out to me is its notion of a subscription and how it impacts
> > message retention. In Kafka, consumers are more loosely coupled and
> > retention is enforced independently of consumption. There are some
> > scenarios I can imagine where the tighter coupling might be beneficial.
> For
> > example, in Kafka Streams, we often use intermediate topics to store the
> > data in one stage of the topology's computation. These topics are
> > exclusively owned by the application and once the messages have been
> > successfully received by the next stage, we do not need to retain them
> > further. But since consumption is independent of retention, we either
> have
> > to choose a large retention time and deal with some temporary storage
> waste
> > or we use a low retention time and possibly lose some messages during an
> > outage.
> >
> > We have solved this problem to some extent in Kafka by introducing an API
> > to delete the records in a partition up to a certain offset, but this
> > effectively puts the burden of this use case on clients. It would be
> > interesting to consider whether we could do something like Pulsar in the
> > Kafka broker. For example, we have a 

[DISCUSS]KIP-235 DNS alias and secured connections

2017-12-05 Thread Skrzypek, Jonathan
Hi,

I would like to discuss a KIP I've submitted :
https://cwiki.apache.org/confluence/display/KAFKA/KIP-235%3A+Add+DNS+alias+support+for+secured+connection

Feedback and suggestions welcome !

Regards,
Jonathan Skrzypek
Middleware Engineering
Messaging Engineering
Goldman Sachs International
Christchurch Court - 10-15 Newgate Street
London EC1A 7HD
Tel: +442070512977



Re: [DISCUSS] KIP-210: Provide for custom error handling when Kafka Streams fails to produce

2017-12-05 Thread Matt Farmer
I have updated this KIP accordingly.

Can you please take a look and let me know if what I wrote looks correct to
you?

https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce

Thanks!

Matt


On December 4, 2017 at 9:39:13 PM, Matt Farmer (m...@frmr.me) wrote:

Hey Matthias, thanks for getting back to me.

That's fine. But if we add it to `test` package, we don't need to talk
about it in the KIP. `test` is not public API.

Yes, that makes sense. It was in the KIP originally because I was, at one
point, planning on including it. We can remove it now that we’ve decided we
won’t include it in the public API.

Understood. That makes sense. We should explain this clearly in the KIP
and maybe log all other following exceptions at DEBUG level?


I thought it was clear in the KIP, but I can go back and double check my
wording and revise it to try and make it clearer.

I’ll take a look at doing more work on the KIP and the Pull Request
tomorrow.

Thanks again!

On December 4, 2017 at 5:50:33 PM, Matthias J. Sax (matth...@confluent.io)
wrote:

Hey,

About your questions:

>>> Acknowledged, so is ProducerFencedException the only kind of exception I
>>> need to change my behavior on? Or are there other types I need to
check? Is
>>> there a comprehensive list somewhere?

I cannot think if any other atm. We should list all fatal exceptions for
which we don't call the handler and explain why (exception is "global"
and will affect all other records, too | ProducerFenced is self-healing).

We started to collect and categorize exception here (not completed yet):
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Architecture#KafkaStreamsArchitecture-TypesofExceptions
:

This list should be a good starting point though.

> I include it in the test package because I have tests that assert that if
> the record collector impl encounters an Exception and receives a CONTINUE
> that it actually does CONTINUE.

That's fine. But if we add it to `test` package, we don't need to talk
about it in the KIP. `test` is not public API.

> I didn't want to invoke the handler in places where the CONTINUE or FAIL
> result would just be ignored. Presumably, after a FAIL has been returned,
> subsequent exceptions are likely to be repeats or noise from my
> understanding of the code paths. If this is incorrect we can revisit.

Understood. That makes sense. We should explain this clearly in the KIP
and maybe log all other following exceptions at DEBUG level?


-Matthias


On 12/1/17 11:43 AM, Matt Farmer wrote:
> Bump! It's been three days here and I haven't seen any further feedback.
> Eager to get this resolved, approved, and merged. =)
>
> On Tue, Nov 28, 2017 at 9:53 AM Matt Farmer  wrote:
>
>> Hi there, sorry for the delay in responding. Last week had a holiday and
>> several busy work days in it so I'm just now getting around to
responding.
>>
>>> We would only exclude
>>> exception Streams can handle itself (like ProducerFencedException) --
>>> thus, if the handler has code to react to this, it would not be bad, as
>>> this code is just never called.
>> [...]
>>> Thus, I am still in favor of not calling the ProductionExceptionHandler
>>> for fatal exception.
>>
>> Acknowledged, so is ProducerFencedException the only kind of exception I
>> need to change my behavior on? Or are there other types I need to check?
Is
>> there a comprehensive list somewhere?
>>
>>> About the "always continue" case. Sounds good to me to remove it (not
>>> sure why we need it in test package?)
>>
>> I include it in the test package because I have tests that assert that if
>> the record collector impl encounters an Exception and receives a CONTINUE
>> that it actually does CONTINUE.
>>
>>> What is there reasoning for invoking the handler only for the first
>>> exception?
>>
>> I didn't want to invoke the handler in places where the CONTINUE or FAIL
>> result would just be ignored. Presumably, after a FAIL has been returned,
>> subsequent exceptions are likely to be repeats or noise from my
>> understanding of the code paths. If this is incorrect we can revisit.
>>
>> Once I get the answers to these questions I can make another pass on the
>> pull request!
>>
>> Matt
>>
>> On Mon, Nov 20, 2017 at 4:07 PM Matthias J. Sax 
>> wrote:
>>
>>> Thanks for following up!
>>>
>>> One thought about an older reply from you:
>>>
>>> I strongly disagree here. The purpose of this handler isn't *just*
to
>>> make a decision for streams. There may also be desirable side
>>> effects that
>>> users wish to cause when production exceptions occur. There may be
>>> side-effects that they wish to cause when AuthenticationExceptions
>>> occur,
>>> as well. We should still give them the hooks to preform those side
>>> effects.
>>>
>>> And your follow up:
>>>
> - I think I would rather invoke it for all exceptions that could
>>> occur
> 

Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Ted Yu
In KeepTimestampOnInvalidTimestamp, there should be check that timestamp is
< 0.
This would protect against future change to onInvalidTimestamp() callback.

Wednesday, December 31, 1969 11:59:59 PM UTC was in the past. Can you
enrich Motivation section on why the proposal is made (writing data
generated nowadays wouldn't result in negative timestamp)?

In Compatibility section, there are two questions without answers.
Are you going to fill out later ?

Cheers

On Tue, Dec 5, 2017 at 8:40 AM, Konstantin Chukhlomin 
wrote:

> Hi all,
>
> I have created a KIP to support negative timestamp:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 228+Negative+record+timestamp+support  confluence/display/KAFKA/KIP-228+Negative+record+timestamp+support>
>
> Here are proposed changes: https://github.com/apache/
> kafka/compare/trunk...chuhlomin:trunk  kafka/compare/trunk...chuhlomin:trunk>
>
> I'm pretty sure that not cases are covered, so comments and suggestions
> are welcome.
>
> Thank you,
> Konstantin


Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-05 Thread Jason Gustafson
Hi Colin,

Thanks for the response. A couple replies:


> I’m a bit ambivalent about letting the client choose the session
> timeout.  What if clients choose timeouts that are too long? Hmm
> I do agree the timeout should be sized proportional to
> max.poll.interval.ms.


We have solved this in other cases by letting the broker enforce a maximum
timeout. After thinking about it a bit, it's probably overkill in this case
since the caching is just an optimization. Instead of stressing over
timeouts and such, I am actually wondering if we just need a reasonable
session cache eviction policy. For example, when the number of slots is
exceeded, perhaps you evict the session with the fewest partitions or the
one with the largest interval between fetches. We could give priority to
the replicas. Perhaps this might let us get rid of a few of the configs.

The main reason is if there is a bug in the incremental fetch feature.
>

This was in response to my question about removing the consumer config. And
sure, any new feature may have bugs, but that's what we have testing for
;). Users can always fall back to a previous version if there are any major
problems. As you know, it's tough removing configs once they are there, so
I think we should try to add them only if they make sense in the long term.

Thanks,
Jason

On Mon, Dec 4, 2017 at 11:06 PM, Colin McCabe  wrote:

> On Mon, Dec 4, 2017, at 02:27, Jan Filipiak wrote:
> >
> >
> > On 03.12.2017 21:55, Colin McCabe wrote:
> > > On Sat, Dec 2, 2017, at 23:21, Becket Qin wrote:
> > >> Thanks for the explanation, Colin. A few more questions.
> > >>
> > >>> The session epoch is not complex.  It's just a number which
> increments
> > >>> on each incremental fetch.  The session epoch is also useful for
> > >>> debugging-- it allows you to match up requests and responses when
> > >>> looking at log files.
> > >> Currently each request in Kafka has a correlation id to help match the
> > >> requests and responses. Is epoch doing something differently?
> > > Hi Becket,
> > >
> > > The correlation ID is used within a single TCP session, to uniquely
> > > associate a request with a response.  The correlation ID is not unique
> > > (and has no meaning) outside the context of that single TCP session.
> > >
> > > Keep in mind, NetworkClient is in charge of TCP sessions, and generally
> > > tries to hide that information from the upper layers of the code.  So
> > > when you submit a request to NetworkClient, you don't know if that
> > > request creates a TCP session, or reuses an existing one.
> > >>> Unfortunately, this doesn't work.  Imagine the client misses an
> > >>> increment fetch response about a partition.  And then the partition
> is
> > >>> never updated after that.  The client has no way to know about the
> > >>> partition, since it won't be included in any future incremental fetch
> > >>> responses.  And there are no offsets to compare, since the partition
> is
> > >>> simply omitted from the response.
> > >> I am curious about in which situation would the follower miss a
> response
> > >> of a partition. If the entire FetchResponse is lost (e.g. timeout),
> the
> > >> follower would disconnect and retry. That will result in sending a
> full
> > >> FetchRequest.
> > > Basically, you are proposing that we rely on TCP for reliable delivery
> > > in a distributed system.  That isn't a good idea for a bunch of
> > > different reasons.  First of all, TCP timeouts tend to be very long.
> So
> > > if the TCP session timing out is your error detection mechanism, you
> > > have to wait minutes for messages to timeout.  Of course, we add a
> > > timeout on top of that after which we declare the connection bad and
> > > manually close it.  But just because the session is closed on one end
> > > doesn't mean that the other end knows that it is closed.  So the leader
> > > may have to wait quite a long time before TCP decides that yes,
> > > connection X from the follower is dead and not coming back, even though
> > > gremlins ate the FIN packet which the follower attempted to translate.
> > > If the cache state is tied to that TCP session, we have to keep that
> > > cache around for a much longer time than we should.
> > Hi,
> >
> > I see this from a different perspective. The cache expiry time
> > has the same semantic as idle connection time in this scenario.
> > It is the time range we expect the client to come back an reuse
> > its broker side state. I would argue that on close we would get an
> > extra shot at cleaning up the session state early. As opposed to
> > always wait for that duration for expiry to happen.
>
> Hi Jan,
>
> The idea here is that the incremental fetch cache expiry time can be
> much shorter than the TCP session timeout.  In general the TCP session
> timeout is common to all TCP connections, and very long.  To make these
> numbers a little more concrete, the TCP session timeout is often
> configured to be 2 hours on Linux.  (See
> 

[DISCUSS] KIP-228 Negative record timestamp support

2017-12-05 Thread Konstantin Chukhlomin
Hi all,

I have created a KIP to support negative timestamp:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-228+Negative+record+timestamp+support
 


Here are proposed changes: 
https://github.com/apache/kafka/compare/trunk...chuhlomin:trunk 


I'm pretty sure that not cases are covered, so comments and suggestions are 
welcome.

Thank you,
Konstantin

Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-12-05 Thread Bill Bejeck
Matthias,

Overall I agree with what you've presented here.

Initially, I was hesitant to remove information from the context of the
result records (Joins or Aggregations) with the thought that when there are
unexpected results, the source information would be useful for tracing back
where the error could have occurred.  But in the case of Joins and
Aggregations, the amount of data needed to do meaningful analysis could be
too much. For example, a join result could come from two topics so you'd
need to keep both original topic names, offsets, etc. (plus the broker
could have deleted the records in the interim so even having offset could
provide nothing).

I'm bit long winded here, but I've come full circle to your original
proposal that since Joins and Aggregations produce fundamentally new types,
we drop the corresponding information from the context even in the case of
single topic aggregations.

Thanks,
Bill

On Mon, Dec 4, 2017 at 7:02 PM, Matthias J. Sax 
wrote:

> I agree with Guozhang that just exposing meta data at the source level
> might not provide too much value. Furthermore, for timestamps we do
> already have a well defined contract and we should exploit it:
> timestamps can always be provided in a meaningful way.
>
> Also, for simple operations like KStream-filter/map the contract is
> simple and we can just use it. Same for KTable-filter/map (for new values).
>
> For aggregations, join, and oldValue, I could just drop some information
> and return `null`/-1, if the result records has no semantically
> meaningful meta data.
>
> For example, for aggregations, we could preserve the partition (as all
> agg-input-records have the same partition). For single input topic
> aggregation (what I guess is the most prominent case), we can also carry
> over the topic name (would be a internal repartitioning topic name
> often). Offsets don't have any semantic interpretation IMHO and we could
> return -1.
>
> For joins, we could keep the partition information. Topic and offset are
> both unknown/invalid for the output record IMHO.
>
> For the oldValue case, we can keep partition and for single input topic
> case topic name. Timestamp might be -1 for now, but after we added
> timestamps to KTable (what we plan to do anyway), we can also return a
> valid timestamp. Offset would be -1 again (if we store offset in KTable
> too, we could provide all offset as well -- but I don't see too much
> value in doing this compared to the storage overhead this implies).
>
>
> WDYT?
>
>
> -Matthias
>
> On 11/29/17 4:14 AM, Jan Filipiak wrote:
> > Hi,
> >
> > thank you for the summary and thanks for acknowledging that I do have a
> > point here.
> >
> > I don't like the second Idea at all. Hence I started of this discussion.
> >
> > I am just disappointed, back then when we had the discussion about how
> > to refactor store overload
> > and IQ handling, I knew the path we are taking is wrong. Having problems
> > implementing these kinda
> > features (wich are really simple)  is just a symptom of messed up IQ
> > implementation. I wish really bad
> > I could have convinced you guys back then. To be honest with IQ we can
> > continue here
> > as we Materialize but would not send oldValue, but with join you're out
> > of luck with current setup.
> >
> > I of course recommend to do not introduce any optimizations here. Id
> > recommend to go towards what
> > I recommended already back then. So i would't say we need to optimize
> > anything later we need to build
> > the topology better in the first place.
> >
> >
> >
> >
> > On 28.11.2017 21:00, Guozhang Wang wrote:
> >> Jan,
> >>
> >> Thanks for your input, I can understand now that the oldValue is also
> >> exposed in user customized `filter` function and hence want record
> >> context
> >> we should expose is a problem. And I think it does brings a good point
> to
> >> consider for KIP-159. The discussions maybe a bit confusing to reader
> >> though, and hence I'd like to summarize the status quo and with a
> >> proposal:
> >>
> >> In today's Streams DSL, when a KTable is created either from a source
> >> topic, or from an stateful operator, we will materialize the KTable
> >> with a
> >> backing state store; on the other hand, KTables created from a
> >> non-stateful
> >> operator like filter, will not be backed by a state store by default
> >> unless
> >> users indicate so (e.g. using the overloaded function with the queryable
> >> name or store supplier).
> >>
> >> For example:
> >>
> >> KTable table1 = builder.table("topic");
> // a
> >> state store created for table1
> >> KTable table2 = table1.filter(..);
> >> // no state store created for table2
> >> KTable table3 = table1.filter(.., "storeName");  // a
> >> state
> >> store created for table3
> >> KTable table4 = table1.groupBy(..).aggregate(..);// a state
> >> store created for table4
> >>
> >> Because of that, the filter() operator above on table1 will always 

Re: Comparing Pulsar and Kafka: unified queuing and streaming

2017-12-05 Thread Marina Popova
Hi,
I don't think it would be such a great idea to start modifying the very 
foundation of Kafka's design to accommodate more and more extra use cases.
Kafka because so widely adopted and popular because its creator made a 
brilliant decision to make it "dumb broker - smart consumer" type of the 
system, where there is no to minimal dependencies between Kafka brokers and 
Consumers. This is what make Kafka blazingly fast and truly scalable - able to 
handle thousands of Consumers with no impact on performance.

One unfortunate consequence of becoming so popular - is that more and more 
people are trying to fit Kafka into their architectures not because it really 
fits, but because everybody else is doing so :) And this causes many requests 
to support more and more reacher functionality to be added to Kafka - like 
transactional messages, more complex acks, centralized consumer management, etc.

If you really need those feature - there are other systems that are designed 
for that.

I truly worry that if all those changes are added to Core Kafka - it will 
become just another "do it all" enterprise-level monster that will be able to 
do it all but at a price of mediocre performance and ten-fold increased 
complexity (and, thus, management and possibility of bugs). Sure, there has to 
be innovation and new features added - but maybe those that require major 
changes to the Kafka's core principles should go into separate frameworks, 
plug-ing (like Connectors) or something in that line, rather that packing it 
all into the Core Kafka.

Just my 2 cents :)

Marina

Sent with [ProtonMail](https://protonmail.com) Secure Email.

>  Original Message 
> Subject: Re: Comparing Pulsar and Kafka: unified queuing and streaming
> Local Time: December 4, 2017 2:56 PM
> UTC Time: December 4, 2017 7:56 PM
> From: ja...@confluent.io
> To: dev@kafka.apache.org
> Kafka Users 
>
> Hi Khurrum,
>
> Thanks for sharing the article. I think one interesting aspect of Pulsar
> that stands out to me is its notion of a subscription and how it impacts
> message retention. In Kafka, consumers are more loosely coupled and
> retention is enforced independently of consumption. There are some
> scenarios I can imagine where the tighter coupling might be beneficial. For
> example, in Kafka Streams, we often use intermediate topics to store the
> data in one stage of the topology's computation. These topics are
> exclusively owned by the application and once the messages have been
> successfully received by the next stage, we do not need to retain them
> further. But since consumption is independent of retention, we either have
> to choose a large retention time and deal with some temporary storage waste
> or we use a low retention time and possibly lose some messages during an
> outage.
>
> We have solved this problem to some extent in Kafka by introducing an API
> to delete the records in a partition up to a certain offset, but this
> effectively puts the burden of this use case on clients. It would be
> interesting to consider whether we could do something like Pulsar in the
> Kafka broker. For example, we have a consumer group coordinator which is
> able to track the progress of the group through its committed offsets. It
> might be possible to extend it to automatically delete records in a topic
> after offsets are committed if the topic is known to be exclusively owned
> by the consumer group. We already have the DeleteRecords API that need, so
> maybe this is "just" a matter of some additional topic metadata. I'd be
> interested to hear whether this kind of use case is common among our users.
>
> -Jason
>
> On Sun, Dec 3, 2017 at 10:29 PM, Khurrum Nasim khurrumnas...@gmail.com
> wrote:
>
>> Dear Kafka Community,
>> I happened to read this blog post comparing the messaging model between
>> Apache Pulsar and Apache Kafka. It sounds interesting. Apache Pulsar claims
>> to unify streaming (kafka) and queuing (rabbitmq) in one unified API.
>> Pulsar also seems to support Kafka API. Have anyone taken a look at Pulsar?
>> How does the community think about this? Pulsar is also an Apache project.
>> Is there any collaboration can happen between these two projects?
>> https://streaml.io/blog/pulsar-streaming-queuing/
>> BTW, I am a Kafka user, loving Kafka a lot. Just try to see what other
>> people think about this.
>>
>> - KN

Re: [DISCUSS] KIP-233: Simplify StreamsBuilder#addGlobalStore

2017-12-05 Thread Bill Bejeck
Hi Panuwat,

Thanks for the KIP, overall looks good to me.

I want to play the devil's advocate for a second and ask do we want to keep
the older method with the extra parameters vs. deprecation?

Although ATM I can't think of a good reason to keep the old method with the
extra parameters.

Thanks,
Bill

On Tue, Dec 5, 2017 at 5:48 AM, Ted Yu  wrote:

> Fine by me.
>
> On Tue, Dec 5, 2017 at 2:45 AM, Panuwat Anawatmongkhon <
> panuwat.anawatmongk...@gmail.com> wrote:
>
> > Thank you, Matthias.
> >
> > Ted,
> > How about this.
> >
> > String globalTopicName = "testGlobalTopic";
> > String globalStoreName = "testAddGlobalStore";
> > final StreamsBuilder builder = new StreamsBuilder();
> > final KeyValueStoreBuilder globalStoreBuilder =
> > EasyMock.createNiceMock(KeyValueStoreBuilder.class);
> > EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).
> > anyTimes();
> > EasyMock.replay(globalStoreBuilder);
> > builder.addGlobalStore(globalStoreBuilder,globalTopicName,new
> > ConsumedInternal(),new MockProcessorSupplier());
> >
> >
> >
> >
> > On Tue, Dec 5, 2017 at 4:58 AM, Matthias J. Sax 
> > wrote:
> >
> > > Panuwat,
> > >
> > > Thanks a lot for the KIP!
> > >
> > > Just one nit: `does not follow provide a good` -> spelling: remove
> > > `follow` ?
> > >
> > > Otherwise, looks good to me.
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 12/4/17 10:49 AM, Ted Yu wrote:
> > > > Looks like you're implying logic similar to this:
> > > >
> > > > public synchronized  GlobalKTable globalTable(final
> > > String
> > > > topic,
> > > >
> > > >   final
> > > > Consumed consumed) {
> > > >
> > > >
> > > > StreamsBuilder is returned instead of GlobalKTable.
> > > >
> > > >
> > > > Can you add code snippet showing how the new API is used ?
> > > >
> > > >
> > > > On Mon, Dec 4, 2017 at 10:09 AM, Panuwat Anawatmongkhon <
> > > > panuwat.anawatmongk...@gmail.com> wrote:
> > > >
> > > >> What i am thinking right now is using the same approach as
> > > >> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder#
> > > >> globalTable
> > > >>
> > > >> On Mon, 4 Dec 2560 at 23:10 Ted Yu  wrote:
> > > >>
> > > >>> Can you describe how sourceName is inferred based on the new API ?
> > > >>>
> > > >>> Please fill out JIRA number.
> > > >>>
> > > >>> BTW here is the URL for the KIP:
> > > >>>
> > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 233%3A+Simplify+
> > > >> StreamsBuilder%23addGlobalStore
> > > >>>
> > > >>> On Mon, Dec 4, 2017 at 7:39 AM, Panuwat Anawatmongkhon <
> > > >>> panuwat.anawatmongk...@gmail.com> wrote:
> > > >>>
> > >  Hi all,
> > >  I created a KIP.
> > >  https://cwiki.apache.org/confluence/display/KAFKA/
> > KIP233%3A+Simplify+
> > >  StreamsBuilder%23addGlobalStore
> > > 
> > >  Cheers,
> > >  Benz
> > > 
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>


[GitHub] kafka pull request #4294: MINOR: Include client-id in client authentication ...

2017-12-05 Thread rajinisivaram
Github user rajinisivaram closed the pull request at:

https://github.com/apache/kafka/pull/4294


---


Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2017-12-05 Thread Rajini Sivaram
Hi Colin,

KAFKA-5722 already has an owner, so I didn't want to confuse the two KIPs.
They can be done independently of each other. The goal is to try and
validate every config to the minimum validation already in the broker for
the static configs, but in some cases to a more restrictive level. So a
typo like a file-not-found or class-not-found would definitely fail the
AlterConfigs request (validation is performed by AlterConfigs regardless of
validateOnly flag). I am working out the additional validation I can
perform as I implement updates for each config. For example,
inter-broker keystore update will not be allowed unless it can be verified
against the currently configured truststore.

On Sat, Dec 2, 2017 at 10:15 PM, Colin McCabe  wrote:

> On Tue, Nov 28, 2017, at 14:48, Rajini Sivaram wrote:
> > Hi Colin,
> >
> > Thank you for reviewing the KIP.
> >
> > *kaka-configs.sh* will be converted to use *AdminClient* under
> > KAFKA-5722.
> > This is targeted for the next release (1.1.0). Under this KIP, we will
> > implement *AdminClient#alterConfigs* for the dynamic configs listed in
> > the KIP. This will include validation of the configs and will return
> > appropriate errors if configs are invalid. Integration tests will also be
> > added using AdmnClient. Only the actual conversion of ConfigCommand to
> > use AdminClient will be left to be done under KAFKA-5722.
>
> Hi Rajini,
>
> It seems like there is no KIP yet for KAFKA-5722.  Does it make sense to
> describe the conversion of kafka-configs.sh to use AdminClient in
> KIP-226?  Since the AlterConfigs RPCs already exist, it should be pretty
> straightforward.  This would also let us add some information about how
> errors will be handled, which is pretty important for users.  For
> example, will kafka-configs.sh give an error if the user makes a typo
> when setting a configuration?
>
> >
> > Once KAFKA-5722 is implemented,* kafka-confgs.sh* can be used to obtain
> > the current configuration, which can be redirected to a text file to
> create a
> > static *server.properties* file. This should help when downgrading - but
> > it does require brokers to be running. We can also document how to obtain
> > the properties using *zookeeper-shell.sh* while downgrading if brokers
> are
> > down.
> >
> > If we rename properties, we should add the new property to ZK based on
> > the value of the old property when the upgraded broker starts up. But we
> > would probably leave the old property as is. The old property will be
> returned
> > and used as a synonym only as long as the broker is on a version where it
> > is still valid. But it can remain in ZK and be updated if downgrading -
> > it will be up to the user to update the old property if downgrading or
> > delete it if not needed. Renaming properties is likely to be confusing
> in any
> > case even without dynamic configs, so hopefully it will be very rare.
>
> Sounds good.
>
> best,
> Colin
>
> >
> >
> > Rajini
> >
> > On Tue, Nov 28, 2017 at 7:47 PM, Colin McCabe 
> wrote:
> >
> > > Hi Rajini,
> > >
> > > This seems like a nice improvement!
> > >
> > > One thing that is a bit concerning is that, if bin/kafka-configs.sh is
> > > used, there is no  way for the broker to give feedback or error
> > > messages.  The broker can't say "sorry, I can't reconfigure that in
> that
> > > way."  Or even "that configuration property is not reconfigurable in
> > > this version of the software."  It seems like in the examples give,
> > > users will simply set a configuration using bin/kafka-configs.sh, but
> > > then they have to check the broker log files to see if it could
> actually
> > > be applied.  And even if it couldn't be applied, then it still lingers
> > > in ZooKeeper.
> > >
> > > This seems like it would lead to a lot of user confusion, since they
> get
> > > no feedback when reconfiguring something.  For example, there will be a
> > > lot of scenarios where someone finds a reconfiguration command on
> > > Google, runs it, but then it doesn't do anything because the software
> > > version is different.  And there's no error message or feedback about
> > > this.  It just doesn't work.
> > >
> > > To prevent this, I think we should convert bin/kafka-configs.sh to use
> > > AdminClient's AlterConfigsRequest.  This allows us to detect scenarios
> > > where, because of a typo, different software version, or a value of the
> > > wrong type (eg. string vs. int), the given configuration cannot be
> > > applied.  We really should convert kafka-configs.sh to use AdminClient
> > > anyway, for all the usual reasons-- people want to lock down ZooKeeper,
> > > ACLs should be enforced, internal representations should be hidden, we
> > > should support environments where ZK is not exposed, etc. etc.
> > >
> > > Another issue that I see here is, how does this interact with
> downgrade?
> > >  Presumably, if the user downgrades to a version that doesn't support
> > > KIP-226, all 

[GitHub] kafka pull request #4294: MINOR: Include client-id in client authentication ...

2017-12-05 Thread rajinisivaram
GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/4294

MINOR: Include client-id in client authentication failure error messages

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka MINOR-authfailure-log

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4294.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4294


commit 2c2acfd86e30a8ff3aa142369c017aabf543c721
Author: Rajini Sivaram 
Date:   2017-12-05T12:23:12Z

MINOR: Include client-id in authentication failure error messages




---


Re: [VOTE] KIP-225 - Use tags for consumer “records.lag” metrics

2017-12-05 Thread charly molter
Thanks Jun and Becket!

I think your point about 1.0 vs 2.0 makes sense I can update the KIP to
reflect this.

What's the process for 2.0 contributions as I can see that trunk is 1.1 and
no 2.x branch?

Here's what I can do:
- Not write the code change until trunk moves to 2.0.
- Write the change but leave the PR open until we start working on 2.0.
- Stall this KIP until 2.0 development starts (IIRC it's pretty soon).
- Do it in a backward compatible way (publish both sets of metrics) and
open a Jira tagged on 2.0 to remove the old metrics.

Let me know what's the right way to go.

Thanks!


On Tue, Dec 5, 2017 at 12:45 AM, Becket Qin  wrote:

> Thanks for the KIP, Charly.
>
> +1. The proposal looks good to me. I agree with Jun that it is better to
> make the metrics consistent with other metrics. That being said, arguably
> this is a backwards incompatible change. Since we are at 1.0, backwards
> incompatible changes are supposed to be in 2.0. Not sure if that is the
> plan or not.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Dec 4, 2017 at 4:20 PM, Jun Rao  wrote:
>
> > Hi, Jiangjie,
> >
> > Since you proposed the original KIP-92, do you want to see if this KIP
> > makes sense?
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Nov 22, 2017 at 2:48 AM, charly molter 
> > wrote:
> >
> > > Hi,
> > >
> > > I would like to start the voting thread for KIP-225.
> > > This KIP proposes to correct some lag metrics emitted by the consumer.
> > >
> > > The KIP wiki is here:
> > > https://cwiki.apache.org/confluence/x/uaBzB
> > >
> > > The discussion thread is here:
> > > http://search-hadoop.com/m/Kafka/uyzND1F33uL19AYx/threaded
> > >
> > > Also could someone assign me to this Jira: KAFKA-5890
> > > 
> > >
> > > Thanks,
> > > --
> > > Charly Molter
> > >
> >
>



-- 
Charly Molter


[jira] [Created] (KAFKA-6310) ConcurrentModificationException when reporting requests-in-flight in producer

2017-12-05 Thread Charly Molter (JIRA)
Charly Molter created KAFKA-6310:


 Summary: ConcurrentModificationException when reporting 
requests-in-flight in producer
 Key: KAFKA-6310
 URL: https://issues.apache.org/jira/browse/KAFKA-6310
 Project: Kafka
  Issue Type: Bug
  Components: metrics, network
Affects Versions: 1.0.0
Reporter: Charly Molter


We are running in an issue really similar to KAFKA-4950.

We have a producer running and a MetricsReporter with a background thread which 
publishes these metrics.

The concurrent exception happens when calling `InFlightRequests.count()` in one 
thread when a connection or disconnection is happening.
In this case one thread is iterating over the map while another is 
adding/removing from it thus causing the exception.

We could potentially fix this with a volatile like in KAFKA-4950.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-233: Simplify StreamsBuilder#addGlobalStore

2017-12-05 Thread Ted Yu
Fine by me.

On Tue, Dec 5, 2017 at 2:45 AM, Panuwat Anawatmongkhon <
panuwat.anawatmongk...@gmail.com> wrote:

> Thank you, Matthias.
>
> Ted,
> How about this.
>
> String globalTopicName = "testGlobalTopic";
> String globalStoreName = "testAddGlobalStore";
> final StreamsBuilder builder = new StreamsBuilder();
> final KeyValueStoreBuilder globalStoreBuilder =
> EasyMock.createNiceMock(KeyValueStoreBuilder.class);
> EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).
> anyTimes();
> EasyMock.replay(globalStoreBuilder);
> builder.addGlobalStore(globalStoreBuilder,globalTopicName,new
> ConsumedInternal(),new MockProcessorSupplier());
>
>
>
>
> On Tue, Dec 5, 2017 at 4:58 AM, Matthias J. Sax 
> wrote:
>
> > Panuwat,
> >
> > Thanks a lot for the KIP!
> >
> > Just one nit: `does not follow provide a good` -> spelling: remove
> > `follow` ?
> >
> > Otherwise, looks good to me.
> >
> >
> > -Matthias
> >
> >
> >
> > On 12/4/17 10:49 AM, Ted Yu wrote:
> > > Looks like you're implying logic similar to this:
> > >
> > > public synchronized  GlobalKTable globalTable(final
> > String
> > > topic,
> > >
> > >   final
> > > Consumed consumed) {
> > >
> > >
> > > StreamsBuilder is returned instead of GlobalKTable.
> > >
> > >
> > > Can you add code snippet showing how the new API is used ?
> > >
> > >
> > > On Mon, Dec 4, 2017 at 10:09 AM, Panuwat Anawatmongkhon <
> > > panuwat.anawatmongk...@gmail.com> wrote:
> > >
> > >> What i am thinking right now is using the same approach as
> > >> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder#
> > >> globalTable
> > >>
> > >> On Mon, 4 Dec 2560 at 23:10 Ted Yu  wrote:
> > >>
> > >>> Can you describe how sourceName is inferred based on the new API ?
> > >>>
> > >>> Please fill out JIRA number.
> > >>>
> > >>> BTW here is the URL for the KIP:
> > >>>
> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 233%3A+Simplify+
> > >> StreamsBuilder%23addGlobalStore
> > >>>
> > >>> On Mon, Dec 4, 2017 at 7:39 AM, Panuwat Anawatmongkhon <
> > >>> panuwat.anawatmongk...@gmail.com> wrote:
> > >>>
> >  Hi all,
> >  I created a KIP.
> >  https://cwiki.apache.org/confluence/display/KAFKA/
> KIP233%3A+Simplify+
> >  StreamsBuilder%23addGlobalStore
> > 
> >  Cheers,
> >  Benz
> > 
> > >>>
> > >>
> > >
> >
> >
>


Re: [DISCUSS] KIP-233: Simplify StreamsBuilder#addGlobalStore

2017-12-05 Thread Panuwat Anawatmongkhon
Thank you, Matthias.

Ted,
How about this.

String globalTopicName = "testGlobalTopic";
String globalStoreName = "testAddGlobalStore";
final StreamsBuilder builder = new StreamsBuilder();
final KeyValueStoreBuilder globalStoreBuilder =
EasyMock.createNiceMock(KeyValueStoreBuilder.class);
EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes();
EasyMock.replay(globalStoreBuilder);
builder.addGlobalStore(globalStoreBuilder,globalTopicName,new
ConsumedInternal(),new MockProcessorSupplier());




On Tue, Dec 5, 2017 at 4:58 AM, Matthias J. Sax 
wrote:

> Panuwat,
>
> Thanks a lot for the KIP!
>
> Just one nit: `does not follow provide a good` -> spelling: remove
> `follow` ?
>
> Otherwise, looks good to me.
>
>
> -Matthias
>
>
>
> On 12/4/17 10:49 AM, Ted Yu wrote:
> > Looks like you're implying logic similar to this:
> >
> > public synchronized  GlobalKTable globalTable(final
> String
> > topic,
> >
> >   final
> > Consumed consumed) {
> >
> >
> > StreamsBuilder is returned instead of GlobalKTable.
> >
> >
> > Can you add code snippet showing how the new API is used ?
> >
> >
> > On Mon, Dec 4, 2017 at 10:09 AM, Panuwat Anawatmongkhon <
> > panuwat.anawatmongk...@gmail.com> wrote:
> >
> >> What i am thinking right now is using the same approach as
> >> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder#
> >> globalTable
> >>
> >> On Mon, 4 Dec 2560 at 23:10 Ted Yu  wrote:
> >>
> >>> Can you describe how sourceName is inferred based on the new API ?
> >>>
> >>> Please fill out JIRA number.
> >>>
> >>> BTW here is the URL for the KIP:
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-233%3A+Simplify+
> >> StreamsBuilder%23addGlobalStore
> >>>
> >>> On Mon, Dec 4, 2017 at 7:39 AM, Panuwat Anawatmongkhon <
> >>> panuwat.anawatmongk...@gmail.com> wrote:
> >>>
>  Hi all,
>  I created a KIP.
>  https://cwiki.apache.org/confluence/display/KAFKA/KIP233%3A+Simplify+
>  StreamsBuilder%23addGlobalStore
> 
>  Cheers,
>  Benz
> 
> >>>
> >>
> >
>
>


[GitHub] kafka pull request #4293: KAFKA-6308: Connect Struct should use deepEquals/d...

2017-12-05 Thread tobiasgies
GitHub user tobiasgies opened a pull request:

https://github.com/apache/kafka/pull/4293

KAFKA-6308: Connect Struct should use deepEquals/deepHashCode

This changes the Struct's equals and hashCode method to use
Arrays#deepEquals and Arrays#deepHashCode, respectively. This resolves
a problem where two structs with values of type byte[] would not be 
considered
equal even though the byte arrays' contents are equal. By using deepEquals,
the byte arrays' contents are compared instead of ther identity.

Since this changes the behavior of the equals method for byte array values,
the behavior of hashCode must change alongside it to ensure the methods
still fulfill the general contract of "equal objects must have equal 
hashCodes".

Test rationale:
All existing unit tests for equals were untouched and continue to work.
A new test method was added to verify the behavior of equals and hashCode
for Struct instances that contain a byte array value. I verify the 
reflixivity and
transitivity of equals as well as the fact that equal Structs have equal 
hashCodes
and not-equal structs do not have equal hashCodes. 

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tobiasgies/kafka feature/kafka-6308-deepequals

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4293.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4293


commit 0e4764d98bdda4246299bfdcc722795ab7834d8e
Author: Tobias Gies 
Date:   2017-12-04T17:19:02Z

KAFKA-6308 use deepEquals instead of equals for values array comparison

commit 4d2ea566179ea1f584cbc5f6304b5c131c72d2a9
Author: Tobias Gies 
Date:   2017-12-04T17:45:04Z

KAFKA-6308 use deepHashCode instead of hashCode to ensure contract 
stability between equals and hashCode

commit 55a3164c27dc2098ab270419032254106c13c533
Author: Tobias Gies 
Date:   2017-12-04T22:02:07Z

Merge branch 'trunk' into feature/kafka-6308-deepequals

commit 3876c1dd2dcde35fd544bb43c2ea124d84860f2a
Author: Tobias Gies 
Date:   2017-12-05T10:26:41Z

KAFKA-6308 restructure test assertions based on method contracts




---


Re: [DISCUSS] KIP-231: Improve the Required ACL of ListGroups API

2017-12-05 Thread Ismael Juma
One comment below.

On Mon, Dec 4, 2017 at 11:40 PM, Dong Lin  wrote:

> In my opinion this changes the semantics of ListGroupsResponse in a
> counter-intuitive way. Usually we use the ACL to determine whether the
> operation on the specified object can be performed or not. The response
> should provide either an error message or the result for the specified
> object. I couldn't remember a case where the ACL is used to filter the
> result without providing error.
>

We do this in the metadata request that requests all topics. We just return
the topics that the user has describe access to.

Ismael


[GitHub] kafka pull request #4292: Is there something wrong? (for kafka 1.0)

2017-12-05 Thread helowken
GitHub user helowken opened a pull request:

https://github.com/apache/kafka/pull/4292

Is there something wrong? (for kafka 1.0)

   In kafka stream source code: InternalTopologyBuilder.java


![image](https://user-images.githubusercontent.com/30587995/33597792-79425f28-d9db-11e7-81f6-15e7f81df264.png)

I think it should be: **globalGroups.add(node);**

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/kafka 1.0

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4292.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4292


commit dc907d9b7b9b0ed719c228237640d5049bb8e483
Author: Apurva Mehta 
Date:   2017-10-05T05:27:03Z

KAFKA-6003; Accept appends on replicas unconditionally when local producer 
state doesn't exist

Without this patch, if the replica's log was somehow truncated before
the leader's, it is possible for the replica fetcher thread to
continuously throw an OutOfOrderSequenceException because the
incoming sequence would be non-zero and there is no local state.

This patch changes the behavior so that the replica state is updated to
the leader's state if there was no local state for the producer at the
time of the append.

Author: Apurva Mehta 

Reviewers: Ismael Juma , Jason Gustafson 


Closes #4004 from apurvam/KAFKA-6003-handle-unknown-producer-on-replica

(cherry picked from commit 6ea4fffdd287a0c6a02c1b6dc1006b1a7b614405)
Signed-off-by: Jason Gustafson 

commit 6c01d68c994d966da1cf74e7127473bda2ea3a46
Author: Rajini Sivaram 
Date:   2017-10-05T16:25:34Z

KAFKA-6012; Close request metrics only after closing request handlers

Author: Rajini Sivaram 

Reviewers: Ismael Juma 

Closes #4024 from rajinisivaram/KAFKA-6012-error-metric

(cherry picked from commit e40b3a2e74133de6d60599beefb65407ca4cc7dd)
Signed-off-by: Rajini Sivaram 

commit 5e2767a26b9e4662df301df9fd57843c36e2cdc7
Author: Joel Hamill 
Date:   2017-10-05T16:30:38Z

Rename streams tutorial and quickstart

Changed these topic titles:
- Write your own Streams Applications -> Tutorial: Write a Streams 
Application
- Play with a Streams Application -> Run the Streams Demo Application

Author: Joel Hamill 
Author: Joel Hamill <11722533+joel-ham...@users.noreply.github.com>

Reviewers: Michael G. Noll , Guozhang Wang 


Closes #4017 from joel-hamill/joel-hamill/streams-titles

commit 1d026269e1ab0af130b78f1efadaabbc4f5a8552
Author: Randall Hauch 
Date:   2017-10-05T18:23:11Z

KAFKA-5903: Added Connect metrics to the worker and distributed herder 
(KIP-196)

Added metrics to the Connect worker and rebalancing metrics to the 
distributed herder.

This is built on top of #3987, and I can rebase this PR once that is merged.

Author: Randall Hauch 

Reviewers: Ewen Cheslack-Postava 

Closes #4011 from rhauch/kafka-5903

(cherry picked from commit a47bfbcae050659d32f777ed2f4b26dda5fbdbbd)
Signed-off-by: Ewen Cheslack-Postava 

commit b2bb2c6e8f6d430bd84690f25c2c1a29bc1f3864
Author: Damian Guy 
Date:   2017-10-05T19:55:55Z

KAFKA-5989: resume consumption of tasks that have state stores but no 
changelogging

Stores where logging is disabled where never consumed as the partitions 
were paused, but never resumed.

Author: Damian Guy 

Reviewers: Guozhang Wang 

Closes #4025 from dguy/1.0

commit 42841c9f36c86585b3b7ed7de29baee2b4f8893c
Author: Derrick Or 
Date:   2017-10-05T20:31:18Z

Update docs to reflect kafka trademark status

Updated a couple places in docs with the 'registered' trademark symbol.

Author: Derrick Or 

Reviewers: Jun Rao 

Closes #4028 from derrickdoo/kafka-trademark-status

(cherry picked from commit f29203d022f110761415c9a2ae2c149b4bc5dd21)
Signed-off-by: Jun Rao 

commit b113b4bd3e3802e8e6974e6754e96bd0f1de32e2
Author: Guozhang Wang 
Date:   2017-10-06T00:02:53Z

KAFKA-5576: RocksDB upgrade to 5.8, plus one bug fix on Bytes.wrap

Author: Guozhang Wang 

Reviewers: Ismael Juma