[jira] [Created] (KAFKA-6348) Kafka consumer can't restore from coordinator failure

2017-12-11 Thread Renjie Liu (JIRA)
Renjie Liu created KAFKA-6348:
-

 Summary: Kafka consumer can't restore from coordinator failure
 Key: KAFKA-6348
 URL: https://issues.apache.org/jira/browse/KAFKA-6348
 Project: Kafka
  Issue Type: Bug
  Components: consumer, core
Affects Versions: 0.10.1.1
Reporter: Renjie Liu


Kafka consumer blocks and keep reporting coordinator is dead. I tried to 
restart the process and it still can't work. Then we shutdown the broker and 
restart consumer, but it still keep reporting coordinator is dead. This 
situation continues until we change our group id and it works.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-238: Expose Kafka cluster ID in Connect REST API

2017-12-11 Thread Ewen Cheslack-Postava
And to clarify a bit further: the goal is for both standalone and
distributed mode to display the same basic information. This hasn't
*strictly* been required before because standalone had no worker-level
interaction with the cluster (configs stored in memory, offsets on disk,
and statuses in memory). However, we've always *expected* that a reasonable
configuration was available for the worker and that any overrides were just
that -- customizations on top of the existing config. Although it could
have been *possible* to leave an invalid config for the worker yet provide
valid configs for producers and consumers, this was never the intent.

Therefore, the argument here is that we *should* be able to rely on a valid
config to connect to the Kafka cluster, whether in standalone or
distributed mode. There should always be a valid "fallback" even if
overrides are provided. We haven't been explicit about this before, but
unless someone objects, I don't think it is unreasonable.

Happy to update the KIP w/ these details if someone feels they would be
valuable.

-Ewen

On Mon, Dec 11, 2017 at 8:21 PM, Ewen Cheslack-Postava 
wrote:

>
> On Mon, Dec 11, 2017 at 4:01 PM, Gwen Shapira  wrote:
>
>> Thanks, Ewen :)
>>
>> One thing that wasn't clear to me from the wiki: Will standalone connect
>> also have a Kafka cluster ID? While it is true that only tasks have
>> producers and consumers, I think we assumed that all tasks on one
>> stand-alone will use one Kafka cluster?
>>
>
> Yeah, maybe not clear enough in the KIP, but this is what I was getting at
> -- while I think it's possible to use different clusters for worker,
> producer, and consumer, I don't think this is really expected or a use case
> worth bending backwards to support perfectly. In standalone mode,
> technically a value is not required because a default is included and we
> only utilize the value currently for the producers/consumers in tasks. But
> I don't think it is unreasonable to require a valid setting at the worker
> level, even if you override the bootstrap.servers for producer and consumer.
>
>
>>
>> Another suggestion is not to block the REST API on the connection, but
>> rather not return the cluster ID until we know it (return null instead).
>> So
>> clients will need to poll rather than block. Not sure this is better, but
>> you didn't really discuss this, so wanted to raise the option.
>>
>
> It's mentioned briefly in https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-238%3A+Expose+Kafka+cluster+
> ID+in+Connect+REST+API#KIP-238:ExposeKafkaclusterIDinConnectR
> ESTAPI-ProposedChanges I think the tradeoff of blocking the server from
> being "started" until we can at least make one request to the cluster isn't
> unreasonable since if you can't do that, you're not going to be able to do
> any useful work anyway. Anyone who might otherwise be using this endpoint
> to monitor health (which it is useful for since it doesn't require any
> other external services to be running just to give a response) can just
> interpret connection refused or timeouts as an unhealthy state, as they
> should anyway.
>
> -Ewen
>
>
>>
>> Gwen
>>
>>
>> On Mon, Dec 11, 2017 at 3:42 PM Ewen Cheslack-Postava 
>> wrote:
>>
>> > I'd like to start discussion on a simple KIP to expose Kafka cluster ID
>> > info in the Connect REST API:
>> >
>> >
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-238%
>> 3A+Expose+Kafka+cluster+ID+in+Connect+REST+API
>> >
>> > Hopefully straightforward, though there are some details on how this
>> > affects startup behavior that might warrant discussion.
>> >
>> > -Ewen
>> >
>>
>
>


Re: [DISCUSS] KIP-222 - Add "describe consumer group" to KafkaAdminClient

2017-12-11 Thread Colin McCabe
Sorry... this is probably a silly question, but do Kafka Connect groups
share a namespace with consumer groups?  If we had a separate API for
Kafka Connect groups vs. Consumer groups, would that make sense?  Or
should we unify them?

best,
Colin


On Mon, Dec 11, 2017, at 16:11, Jason Gustafson wrote:
> Hi Jorge,
> 
> Kafka group management is actually more general than consumer groups
> (e.g.
> there are kafka connect groups). If we are adding these APIs, I would
> suggest we consider the more general protocol and how to expose
> group-protocol-specific metadata. For example, it might be reasonable to
> have both an API to access to the low-level bytes as well as some
> higher-level convenience APIs for accessing consumer groups.
> 
> Thanks,
> Jason
> 
> On Mon, Dec 4, 2017 at 4:07 PM, Matthias J. Sax 
> wrote:
> 
> > Jorge,
> >
> > is there any update regarding this KIP?
> >
> >
> > -Matthias
> >
> > On 11/17/17 9:14 AM, Guozhang Wang wrote:
> > > Hello Jorge,
> > >
> > > I made a pass over the wiki, and here are a few comments:
> > >
> > > 1. First, regarding to Tom's comment #2 above, I think if we are only
> > going
> > > to include the String groupId. Then it is Okay to keep as a String than
> > > using a new wrapper class. However, I think we could include the
> > > protocol_type returned from the ListGroupsResponse along with the
> > groupId.
> > > This is a very useful information to tell which consumer groups are from
> > > Connect, which ones are from Streams, which ones are user-customized etc.
> > > With this, it is reasonable to keep a wrapper class.
> > >
> > > 2. In ConsumerDescription, could we also add the state, protocol_type
> > > (these two are form DescribeGroupResponse), and the Node coordinator
> > (this
> > > may be returned from the AdminClient itself) as well? This is also for
> > > information consistency with the old client (note that protocol_type was
> > > called assignment_strategy there).
> > >
> > > 3. With 1) / 2) above, maybe we can rename "ConsumerGroupListing" to
> > > "ConsumerGroupSummary" and make "ConsumerGroupDescription" an extended
> > > class of the former with the additional fields?
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Nov 7, 2017 at 2:13 AM, Jorge Esteban Quilcate Otoya <
> > > quilcate.jo...@gmail.com> wrote:
> > >
> > >> Hi Tom,
> > >>
> > >> 1. You're right. I've updated the KIP accordingly.
> > >> 2. Yes, I have add it to keep consistency, but I'd like to know what
> > others
> > >> think about this too.
> > >>
> > >> Cheers,
> > >> Jorge.
> > >>
> > >> El mar., 7 nov. 2017 a las 9:29, Tom Bentley ()
> > >> escribió:
> > >>
> > >>> Hi again Jorge,
> > >>>
> > >>> A couple of minor points:
> > >>>
> > >>> 1. ConsumerGroupDescription has the member `name`, but everywhere else
> > >> that
> > >>> I've seen the term "group id" is used, so perhaps calling it "id" or
> > >>> "groupId" would be more consistent.
> > >>> 2. I think you've added ConsumerGroupListing for consistency with
> > >>> TopicListing. For topics it makes sense because at well as the name
> > there
> > >>> is whether the topic is internal. For consumer groups, though there is
> > >> just
> > >>> the name and having a separate ConsumerGroupListing seems like it
> > doesn't
> > >>> add very much, and would mostly get in the way when using the API. I
> > >> would
> > >>> be interested in what others thought about this.
> > >>>
> > >>> Cheers,
> > >>>
> > >>> Tom
> > >>>
> > >>> On 6 November 2017 at 22:16, Jorge Esteban Quilcate Otoya <
> > >>> quilcate.jo...@gmail.com> wrote:
> > >>>
> >  Thanks for the feedback!
> > 
> >  @Ted Yu: Links added.
> > 
> >  KIP updated. Changes:
> > 
> >  * `#listConsumerGroups(ListConsumerGroupsOptions options)` added to
> > >> the
> >  API.
> >  * `DescribeConsumerGroupResult` and `ConsumerGroupDescription` classes
> >  described.
> > 
> >  Cheers,
> >  Jorge.
> > 
> > 
> > 
> > 
> >  El lun., 6 nov. 2017 a las 20:28, Guozhang Wang ( > >)
> >  escribió:
> > 
> > > Hi Matthias,
> > >
> > > You meant "list groups" I think?
> > >
> > > Guozhang
> > >
> > > On Mon, Nov 6, 2017 at 11:17 AM, Matthias J. Sax <
> > >>> matth...@confluent.io>
> > > wrote:
> > >
> > >> The main goal of this KIP is to enable decoupling StreamsResetter
> > >>> from
> > >> core module. For this case (ie, using AdminClient within
> > >> StreamsResetter) we get the group.id from the user as command line
> > >> argument. Thus, I think the KIP is useful without "describe group"
> > >> command to.
> > >>
> > >> I am happy to include "describe group" command in the KIP. Just
> > >> want
> > >>> to
> > >> point out, that there is no reason to insist on it IMHO.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 11/6/17 7:06 PM, Guozhang Wang 

Re: [DISCUSS] KIP-238: Expose Kafka cluster ID in Connect REST API

2017-12-11 Thread Ewen Cheslack-Postava
On Mon, Dec 11, 2017 at 4:01 PM, Gwen Shapira  wrote:

> Thanks, Ewen :)
>
> One thing that wasn't clear to me from the wiki: Will standalone connect
> also have a Kafka cluster ID? While it is true that only tasks have
> producers and consumers, I think we assumed that all tasks on one
> stand-alone will use one Kafka cluster?
>

Yeah, maybe not clear enough in the KIP, but this is what I was getting at
-- while I think it's possible to use different clusters for worker,
producer, and consumer, I don't think this is really expected or a use case
worth bending backwards to support perfectly. In standalone mode,
technically a value is not required because a default is included and we
only utilize the value currently for the producers/consumers in tasks. But
I don't think it is unreasonable to require a valid setting at the worker
level, even if you override the bootstrap.servers for producer and consumer.


>
> Another suggestion is not to block the REST API on the connection, but
> rather not return the cluster ID until we know it (return null instead). So
> clients will need to poll rather than block. Not sure this is better, but
> you didn't really discuss this, so wanted to raise the option.
>

It's mentioned briefly in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-238%3A+Expose+Kafka+cluster+ID+in+Connect+REST+API#KIP-238:ExposeKafkaclusterIDinConnectRESTAPI-ProposedChanges
I think the tradeoff of blocking the server from being "started" until we
can at least make one request to the cluster isn't unreasonable since if
you can't do that, you're not going to be able to do any useful work
anyway. Anyone who might otherwise be using this endpoint to monitor health
(which it is useful for since it doesn't require any other external
services to be running just to give a response) can just interpret
connection refused or timeouts as an unhealthy state, as they should anyway.

-Ewen


>
> Gwen
>
>
> On Mon, Dec 11, 2017 at 3:42 PM Ewen Cheslack-Postava 
> wrote:
>
> > I'd like to start discussion on a simple KIP to expose Kafka cluster ID
> > info in the Connect REST API:
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 238%3A+Expose+Kafka+cluster+ID+in+Connect+REST+API
> >
> > Hopefully straightforward, though there are some details on how this
> > affects startup behavior that might warrant discussion.
> >
> > -Ewen
> >
>


[jira] [Resolved] (KAFKA-5551) StreamThread should not expose methods for testing

2017-12-11 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5551.

Resolution: Fixed

> StreamThread should not expose methods for testing
> --
>
> Key: KAFKA-5551
> URL: https://issues.apache.org/jira/browse/KAFKA-5551
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>
> {{StreamsThread}} currently exposes {{createStreamTask()}} and 
> {{createStandbyTask()}} as {{protected}} in order to inject "test tasks" in 
> unit tests. We should rework this and make both methods {{private}}. Maybe we 
> can introduce a {{TaskSupplier}} similar to {{KafkaClientSupplier}} (however, 
> {{TaskSupplier}} should not be public API and be in package {{internal}}).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field

2017-12-11 Thread Dong Lin
Hey Jun,

I have updated the KIP based on our discussion. Thanks!

Dong

On Sat, Dec 9, 2017 at 10:12 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks much for your comments. Given that client needs to de-serialize the
> metadata anyway, the extra overhead of checking the per-partition version
> for every partition should not be a big concern. Thus it makes sense to use
> leader epoch as the per-partition version instead of creating a global
> metadata version. I will update the KIP to do that.
>
> Regarding the detection of outdated metadata, I think it is possible to
> ensure that client gets latest metadata by fetching from controller. Note
> that this requires extra logic in the controller such that controller
> updates metadata directly in memory without requiring
> UpdateMetadataRequest. But I am not sure the main motivation of this at
> this moment. But this makes controller more like a bottleneck in the
> cluster which we probably want to avoid.
>
> I think we can probably keep the current way of ensuring metadata
> freshness. Currently client will be forced to refresh metadata if broker
> returns error (e.g. NotLeaderForPartition) due to outdated metadata or if
> the metadata does not contain the partition that the client needs. In the
> future, as you previously suggested, we can include per-partition
> leaderEpoch in the FetchRequest/ProduceRequest such that broker can return
> error if the epoch is smaller than cached epoch in the broker. Given that
> this adds more complexity to Kafka, I think we can probably think about
> that leader when we have a specific use-case or problem to solve with
> up-to-date metadata. Does this sound OK?
>
> Thanks,
> Dong
>
>
>
> On Fri, Dec 8, 2017 at 3:53 PM, Jun Rao  wrote:
>
>> Hi, Dong,
>>
>> Thanks for the reply. A few more points below.
>>
>> For dealing with how to prevent a consumer switching from a new leader to
>> an old leader, you suggestion that refreshes metadata on consumer restart
>> until it sees a metadata version >= the one associated with the offset
>> works too, as long as we guarantee that the cached metadata versions on
>> the
>> brokers only go up.
>>
>> The second discussion point is on whether the metadata versioning should
>> be
>> per partition or global. For the partition level versioning, you were
>> concerned about the performance. Given that metadata updates are rare, I
>> am
>> not sure if it's a big concern though. Doing a million if tests is
>> probably
>> going to take less than 1ms. Another thing is that the metadata version
>> seems to need to survive controller failover. In your current approach, a
>> consumer may not be able to wait on the right version of the metadata
>> after
>> the consumer restart since the metadata version may have been recycled on
>> the server side due to a controller failover while the consumer is down.
>> The partition level leaderEpoch survives controller failure and won't have
>> this issue.
>>
>> Lastly, neither your proposal nor mine addresses the issue how to
>> guarantee
>> a consumer to detect that is metadata is outdated. Currently, the consumer
>> is not guaranteed to fetch metadata from every broker within some bounded
>> period of time. Maybe this is out of the scope of your KIP. But one idea
>> is
>> force the consumer to refresh metadata from the controller periodically.
>>
>> Jun
>>
>>
>> On Thu, Dec 7, 2017 at 11:25 AM, Dong Lin  wrote:
>>
>> > Hey Jun,
>> >
>> > Thanks much for the comments. Great point particularly regarding (3). I
>> > haven't thought about this before.
>> >
>> > It seems that there are two possible ways where the version number can
>> be
>> > used. One solution is for client to check the version number at the
>> time it
>> > receives MetadataResponse. And if the version number in the
>> > MetadataResponse is smaller than the version number in the client's
>> cache,
>> > the client will be forced to fetch metadata again.  Another solution, as
>> > you have suggested, is for broker to check the version number at the
>> time
>> > it receives a request from client. The broker will reject the request if
>> > the version is smaller than the version in broker's cache.
>> >
>> > I am not very sure that the second solution can address the problem
>> here.
>> > In the scenario described in the JIRA ticket, broker's cache may be
>> > outdated because it has not processed the LeaderAndIsrRequest from the
>> > controller. Thus it may still process client's request even if the
>> version
>> > in client's request is actually outdated. Does this make sense?
>> >
>> > IMO, it seems that we can address problem (3) by saving the metadata
>> > version together with the offset. After consumer starts, it will keep
>> > fetching metadata until the metadata version >= the version saved with
>> the
>> > offset of this partition.
>> >
>> > Regarding problems (1) and (2): Currently we use the version number in
>> the
>> > 

Re: [DISCUSS] KIP-234: add support for getting topic defaults from AdminClient

2017-12-11 Thread Colin McCabe
Hi Dan,

The KIP looks good overall.

On Mon, Dec 11, 2017, at 18:28, Ewen Cheslack-Postava wrote:
> I think the key point is when the kafka admin and user creating topics
> differ. I think a more realistic example of Dan's point (2) is for
> retention. I know that realistically, admins aren't just going to
> randomly
> drop the broker defaults from 1w to 1d without warning anyone (they'd
> likely be fired...). But as a user, I may not know the broker configs, if
> admins have overridden them, etc. I may want a *minimum* of, e.g., 2d.
> But if the broker defaults are higher such that the admins are confident the
> cluster can handle 1w, I'd rather just fall back on the default value.

Right.  I think this API addresses a similar set of use-cases as adding
the "validateOnly" boolean for createTopics.  You shouldn't have to
create a topic to know whether it was possible to create it, or what the
retention will end up being, etc. etc.

> Now, there's arguably a better solution for that case -- allow topic
> configs to express a *minimum* value (or maximum depending on the
> particular config), with the broker config taking precedence if it has a
> smaller value (or larger in the case of maximums). This lets you express
> your minimum requirements but allows the cluster to do more if that's the
> default. However, that would represent a much more significant and
> invasive change, and honestly I think it is more likely to confuse users.

There always need to be topic defaults, though.  If we add a foobar
configuration for topics, existing topics will need to get grandfathered
in with a default foobar.  And they won't be able to set min and max
ranges, because foobars didn't exist back when the old topics were
created.

> 
> @Dan, regarding compatibility, this changes behavior without revving the
> request version number, which normally we only do for things that are
> reasonably considered bugfixes or were it has no compatibility
> implications. In this case, older brokers talking to newer AdminClients
> will presumably return some error. Do we know what the non-null assertion
> gets converted to and if we're happy with the behavior (i.e. will
> applications be able to do something reasonable, distinguish it from some
> completely unrelated error, etc)? Similarly, it's obviously only one
> implementation using the KIP-4 APIs, but do we know what client-side
> validation AdminClient is already doing and whether old AdminClients
> talking to new brokers will see a change in behavior (or do they do
> client-side validation such that old clients simply wouldn't have access
> to this new functionality)?

I think we should bump the API version for this or add a new API key. 
Nothing good is going to happen by pretending like this is compatible
with existing brokers.

Also, I think it would be better just to have a separate function in
AdminClient rather than overloading the behavior of NULL in
describeConfigs.  It's not really that much more effort to have another
AdminClient function, and it's a lot simpler for devs to understand than
magical NULL behavior in describeConfigs.

best,
Colin

> 
> -Ewen
> 
> On Mon, Dec 11, 2017 at 2:11 PM, dan  wrote:
> 
> > Dong,
> >
> > I agree that it *may* be better for a user to be explicit, however there
> > are a couple reasons they may not.
> > 1) a user doesn't even know what the options are. imagine writing a tool
> > for users to create topics that steps them through things:
> >
> > $ kafka-topics.sh --create
> > Give your topic a name: my-fav-topic
> > How many partitions do you want [12]:
> > What is the minimum in set replica size [2]:
> > What is the maximum message size [1MB]:
> > ...
> >
> > 2) a user wants to use broker defaults within reason. say they thinke they
> > want min.cleanable.dirty.ratio=.5 and the default is .6. maybe thats fine,
> > or even better for them. since the person maintaining the actual cluster
> > has put thought in to this config. and as the maintainer keeps working on
> > making the cluster run better they can change and tune things on the
> > cluster level as needed.
> >
> > dan
> >
> >
> > On Wed, Dec 6, 2017 at 11:51 AM, Dong Lin  wrote:
> >
> > > Hey Dan,
> > >
> > > I think you are saying that, if user can read the default config before
> > > creating the topic, then this user can make better decision in what
> > configs
> > > need to be overwritten. The question here is, how user is going to use
> > this
> > > config to make the decision.
> > >
> > > In my understanding, user will compare the default value with expected
> > > value, and override the config to be expected value if they are
> > different.
> > > If this is the only way that the default value can affect user's
> > decision,
> > > then it seems OK for user to directly override the config to the expected
> > > value. I am wondering if this solution has some drawback.
> > >
> > > On the other hand, maybe there is a more advanced 

Re: [VOTE] KIP-218: Make KafkaFuture.Function java 8 lambda compatible

2017-12-11 Thread Colin McCabe
+1 (non-binding)

P.S. Suggest to use whenComplete instead of making addWaiter public.

(The differences is very slight : addWaiter returns void, but
whenComplete returns a future which gets completed with either an
exception if the BiConsumer failed, or the value, otherwise.)

Colin


On Mon, Dec 11, 2017, at 13:14, Ewen Cheslack-Postava wrote:
> +1 (binding)
> 
> -Ewen
> 
> On Mon, Dec 11, 2017 at 12:40 PM, Gwen Shapira  wrote:
> 
> > +1 (binding) - nice API improvement, thanks for driving it!
> >
> > On Mon, Dec 11, 2017 at 11:52 AM Xavier Léauté 
> > wrote:
> >
> > > Thanks Steven, I believe I addressed all the comments. If the it looks
> > good
> > > to you let's move forward on the vote.
> > >
> > > On Sat, Dec 9, 2017 at 12:50 AM Steven Aerts 
> > > wrote:
> > >
> > > > Hello Xavier,
> > > >
> > > > for me it is perfect to take it along.
> > > > I made a few small remarks in your PR.
> > > >
> > > > Thanks
> > > >
> > > > Op za 9 dec. 2017 om 01:29 schreef Xavier Léauté  > >:
> > > >
> > > > > Hi Steve, I just posted in the discussion thread, there's just one
> > tiny
> > > > fix
> > > > > I think would be useful to add while we're making changes to this
> > API.
> > > > > Do you mind having a look?
> > > > >
> > > > > On Fri, Dec 8, 2017 at 11:37 AM Mickael Maison <
> > > mickael.mai...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > +1 (non binding)
> > > > > > Thanks for the KIP
> > > > > >
> > > > > > On Fri, Dec 8, 2017 at 6:53 PM, Tom Bentley  > >
> > > > > wrote:
> > > > > > > +1
> > > > > > >
> > > > > > > On 8 December 2017 at 18:34, Ted Yu  wrote:
> > > > > > >
> > > > > > >> +1
> > > > > > >>
> > > > > > >> On Fri, Dec 8, 2017 at 3:49 AM, Steven Aerts <
> > > > steven.ae...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > Hello everybody,
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > I think KIP-218 is crystallized enough to start voting.
> > > > > > >> >
> > > > > > >> > KIP documentation:
> > > > > > >> >
> > > > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > >> > 218%3A+Make+KafkaFuture.Function+java+8+lambda+compatible
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Thanks,
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >Steven
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >


[GitHub] kafka-site pull request #111: KAFKA-6334: fix typo in backwards compatibilit...

2017-12-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka-site/pull/111


---


[jira] [Resolved] (KAFKA-6334) Minor documentation typo

2017-12-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6334.
--
Resolution: Fixed

> Minor documentation typo
> 
>
> Key: KAFKA-6334
> URL: https://issues.apache.org/jira/browse/KAFKA-6334
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 1.0.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Trivial
> Fix For: 1.0.0
>
>
> At [1]:
> {quote}
> 0.11.0 consumers support backwards compatibility with brokers 0.10.0 brokers 
> and upward, so it is possible to upgrade the clients first before the brokers
> {quote}
> Specifically the "brokers 0.10.0 brokers" wording.
> [1] http://kafka.apache.org/documentation.html#upgrade_11_message_format



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-218: Make KafkaFuture.Function java 8 lambda compatible

2017-12-11 Thread Colin McCabe
Thanks, Xavier we should definitely think about what happens when
exceptions are thrown from these functions.

I would suggest maybe we should just implement whenComplete, rather than
exposing addWaiter.  addWaiter was never intended as a public API, and
it's a little weird.  whenComplete is nice because it supports chaining,
and should be more familiar to users of other async APIs.

best,
Colin


On Fri, Dec 8, 2017, at 16:26, Xavier Léauté wrote:
> Hi Steven,
> 
> I noticed you are making KafkaFuture.addWaiter(...) public as part of
> your
> PR. This is a very useful method to add – and you should mention it  in
> the
> KIP – however addWaiter currently doesn't guard against exceptions thrown
> inside of the BiConsumer function, which is something we should probably
> fix before making it public.
> 
> I was about to make the necessary exception handling changes as part of
> https://github.com/apache/kafka/pull/4308 until someone pointed out your
> KIP to me. Since you already have a PR out, it might be worth
> incorporating
> my fixes (and the extra docs), what do you think?
> 
> I'll rebase my PR onto yours to make it easier to merge.
> 
> Thanks!
> Xavier
> 
> 
> On Mon, Dec 4, 2017 at 4:03 AM Steven Aerts 
> wrote:
> 
> > Tom,
> >
> > Thanks for the review.
> > updated the motivation a little bit, it's better, but I have to admit can
> > be improved.
> > I made addWaiters public.
> >
> > Enjoy,
> >
> > Steven
> >
> >
> >
> > Op ma 4 dec. 2017 om 11:01 schreef Tom Bentley :
> >
> > > Hi Steven,
> > >
> > > Thanks for updating the KIP. I have a couple of points:
> > >
> > > 1. Typo in the first sentence of the Motivation. Also what does "empty
> > > public abstract classes with one abstract method" mean -- if it's got one
> > > abstract method in what way is it empty?
> > > 2.From an entirely self-centred point of view, the main thing that's
> > > missing for my work in KIP-183 is that addWaiter() needs to be public.
> > >
> > > Thanks again,
> > >
> > > Tom
> > >
> > > On 2 December 2017 at 10:07, Steven Aerts 
> > wrote:
> > >
> > > > Hi Tom,
> > > >
> > > > I just made changes to the proposal of KIP-218, to make everything more
> > > > backwards compatible as suggested by Collin.
> > > > For me it is now in a state where starts to become final.
> > > >
> > > > I propose to wait a few days so everybody can take a look and open the
> > > > votes when I do not receive any major comments.
> > > >
> > > > Does that sound ok for you?
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 218%3A+Make+KafkaFuture.Function+java+8+lambda+compatible
> > > >
> > > > Thanks for your patience,
> > > >
> > > >
> > > >Steven
> > > >
> > > >
> > > > Op vr 1 dec. 2017 om 11:55 schreef Tom Bentley  > >:
> > > >
> > > > > Hi Steven,
> > > > >
> > > > > I'm particularly interested in seeing progress on this KIP as the
> > work
> > > > for
> > > > > KIP-183 needs a public version of BiConsumer. do you have any idea
> > when
> > > > the
> > > > > KIP might be ready for voting?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Tom
> > > > >
> > > > > On 10 November 2017 at 13:38, Steven Aerts 
> > > > wrote:
> > > > >
> > > > > > Collin, Ben,
> > > > > >
> > > > > > Thanks for the input.
> > > > > >
> > > > > > I will work out this proposa, so I get an idea on the impact.
> > > > > >
> > > > > > Do you think it is a good idea to line up the new method names with
> > > > those
> > > > > > of CompletableFuture?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > >
> > > > > >Steven
> > > > > >
> > > > > > Op vr 10 nov. 2017 om 12:12 schreef Ben Stopford  > >:
> > > > > >
> > > > > > > Sounds like a good middle ground to me. What do you think Steven?
> > > > > > >
> > > > > > > On Mon, Nov 6, 2017 at 8:18 PM Colin McCabe 
> > > > > wrote:
> > > > > > >
> > > > > > > > It would definitely be nice to use the jdk8
> > CompletableFuture.  I
> > > > > think
> > > > > > > > that's a bit of a separate discussion, though, since it has
> > such
> > > > > heavy
> > > > > > > > compatibility implications.
> > > > > > > >
> > > > > > > > How about making KIP-218 backwards compatible?  As a starting
> > > > point,
> > > > > > you
> > > > > > > > can change KafkaFuture#BiConsumer to an interface with no
> > > > > compatibility
> > > > > > > > implications, since there are currently no public functions
> > > exposed
> > > > > > that
> > > > > > > > use it.  That leaves KafkaFuture#Function, which is publicly
> > used
> > > > > now.
> > > > > > > >
> > > > > > > > For the purposes of KIP-218, how about adding a new interface
> > > > > > > > FunctionInterface?  Then you can add a function like this:
> > > > > > > >
> > > > > > > > >  public abstract  KafkaFuture
> > > > thenApply(FunctionInterface > > > > R>
> > > > > > > > function);
> > > > > > 

[GitHub] kafka-site issue #111: KAFKA-6334: fix typo in backwards compatibility note

2017-12-11 Thread guozhangwang
Github user guozhangwang commented on the issue:

https://github.com/apache/kafka-site/pull/111
  
LGTM. Merged to asf-site.


---


[GitHub] kafka pull request #4316: kafka-future-whencomplete

2017-12-11 Thread cmccabe
GitHub user cmccabe opened a pull request:

https://github.com/apache/kafka/pull/4316

kafka-future-whencomplete

*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*

*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cmccabe/kafka kafka-future-whencomplete

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4316.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4316


commit ceb8ab1b2f341b7657508c1c55396a7049ec6a54
Author: Colin P. Mccabe 
Date:   2017-12-12T02:47:12Z

kafka-future-whencomplete




---


Re: [DISCUSS] KIP-234: add support for getting topic defaults from AdminClient

2017-12-11 Thread Ewen Cheslack-Postava
I think the key point is when the kafka admin and user creating topics
differ. I think a more realistic example of Dan's point (2) is for
retention. I know that realistically, admins aren't just going to randomly
drop the broker defaults from 1w to 1d without warning anyone (they'd
likely be fired...). But as a user, I may not know the broker configs, if
admins have overridden them, etc. I may want a *minimum* of, e.g., 2d. But
if the broker defaults are higher such that the admins are confident the
cluster can handle 1w, I'd rather just fall back on the default value.

Now, there's arguably a better solution for that case -- allow topic
configs to express a *minimum* value (or maximum depending on the
particular config), with the broker config taking precedence if it has a
smaller value (or larger in the case of maximums). This lets you express
your minimum requirements but allows the cluster to do more if that's the
default. However, that would represent a much more significant and invasive
change, and honestly I think it is more likely to confuse users.

@Dan, regarding compatibility, this changes behavior without revving the
request version number, which normally we only do for things that are
reasonably considered bugfixes or were it has no compatibility
implications. In this case, older brokers talking to newer AdminClients
will presumably return some error. Do we know what the non-null assertion
gets converted to and if we're happy with the behavior (i.e. will
applications be able to do something reasonable, distinguish it from some
completely unrelated error, etc)? Similarly, it's obviously only one
implementation using the KIP-4 APIs, but do we know what client-side
validation AdminClient is already doing and whether old AdminClients
talking to new brokers will see a change in behavior (or do they do
client-side validation such that old clients simply wouldn't have access to
this new functionality)?

-Ewen

On Mon, Dec 11, 2017 at 2:11 PM, dan  wrote:

> Dong,
>
> I agree that it *may* be better for a user to be explicit, however there
> are a couple reasons they may not.
> 1) a user doesn't even know what the options are. imagine writing a tool
> for users to create topics that steps them through things:
>
> $ kafka-topics.sh --create
> Give your topic a name: my-fav-topic
> How many partitions do you want [12]:
> What is the minimum in set replica size [2]:
> What is the maximum message size [1MB]:
> ...
>
> 2) a user wants to use broker defaults within reason. say they thinke they
> want min.cleanable.dirty.ratio=.5 and the default is .6. maybe thats fine,
> or even better for them. since the person maintaining the actual cluster
> has put thought in to this config. and as the maintainer keeps working on
> making the cluster run better they can change and tune things on the
> cluster level as needed.
>
> dan
>
>
> On Wed, Dec 6, 2017 at 11:51 AM, Dong Lin  wrote:
>
> > Hey Dan,
> >
> > I think you are saying that, if user can read the default config before
> > creating the topic, then this user can make better decision in what
> configs
> > need to be overwritten. The question here is, how user is going to use
> this
> > config to make the decision.
> >
> > In my understanding, user will compare the default value with expected
> > value, and override the config to be expected value if they are
> different.
> > If this is the only way that the default value can affect user's
> decision,
> > then it seems OK for user to directly override the config to the expected
> > value. I am wondering if this solution has some drawback.
> >
> > On the other hand, maybe there is a more advanced way that the default
> > value can affect the user's decision. It may be useful to understand this
> > use-case more specifically. Could you help provide a specific example
> here?
> >
> > Thanks,
> > Dong
> >
> >
> > On Wed, Dec 6, 2017 at 11:12 AM, dan  wrote:
> >
> > > Rajini,
> > >
> > > that was not my intent, the intent was to give a user of this api an
> > > insight in to what their topic will look like once created. as things
> > stand
> > > now a user is unable to (easily) have any knowledge of what their topic
> > > configs will be before doing a `CREATE_TOPICS`. as i mentioned in the
> > KIP,
> > > another option would be to have the `CreateTopicsOptions.
> > > validateOnly=true`
> > > version return data, but seems more invasive/questionable.
> > >
> > > dan
> > >
> > > On Wed, Dec 6, 2017 at 5:10 AM, Rajini Sivaram <
> rajinisiva...@gmail.com>
> > > wrote:
> > >
> > > > Hi Dan,
> > > >
> > > > Thank you for the KIP. KIP-226 (https://cwiki.apache.org/
> > > > confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration)
> > > proposes
> > > > to add an option to include all synonyms of a config option when
> > > describing
> > > > configs. This includes any defaults. For example (using Dong's
> > example),
> > > if
> > > > you have 

Re: [DISCUSS] KIP-238: Expose Kafka cluster ID in Connect REST API

2017-12-11 Thread Gwen Shapira
Thanks, Ewen :)

One thing that wasn't clear to me from the wiki: Will standalone connect
also have a Kafka cluster ID? While it is true that only tasks have
producers and consumers, I think we assumed that all tasks on one
stand-alone will use one Kafka cluster?

Another suggestion is not to block the REST API on the connection, but
rather not return the cluster ID until we know it (return null instead). So
clients will need to poll rather than block. Not sure this is better, but
you didn't really discuss this, so wanted to raise the option.

Gwen


On Mon, Dec 11, 2017 at 3:42 PM Ewen Cheslack-Postava 
wrote:

> I'd like to start discussion on a simple KIP to expose Kafka cluster ID
> info in the Connect REST API:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-238%3A+Expose+Kafka+cluster+ID+in+Connect+REST+API
>
> Hopefully straightforward, though there are some details on how this
> affects startup behavior that might warrant discussion.
>
> -Ewen
>


Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field

2017-12-11 Thread Jun Rao
Hi, Dong,

The case that I am thinking is network partitioning. Suppose one deploys a
stretched cluster across multiple AZs in the same region. If the machines
in one AZ can't communicate to brokers in other AZs due to a network issue,
the brokers in that AZ won't get any new metadata.

We can potentially solve this problem by requiring some kind of regular
heartbeats between the controller and the broker. This may need some more
thoughts. So, it's probably fine to leave this to another KIP in the future.

Thanks,

Jun

On Mon, Dec 11, 2017 at 2:55 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks for the comment. I am open to improve this KIP to address more
> problems. I probably need more help in understanding what is the current
> problem with consumer using outdated metadata and whether it is easier to
> address it together with this KIP.
>
> I agree that a consumer can potentially talk to old leader for a long time
> even after this KIP. But after this KIP, the consumer probably should not
> get OffetOutofRangeException and therefore will not cause offset rewind
> issue. So the only problem is that consumer will not be able to fetch data
> until it has updated metadata. It seems that this situation can only happen
> if the broker is too slow in processing LeaderAndIsrRequest since otherwise
> the consumer will be forced to update metadata due to
> NotLeaderForPartitionException. So the problem we are having here is that
> consumer will not be able to fetch data if some broker is too slow in
> processing LeaderAndIsrRequest.
>
> Because Kafka propagates LeaderAndIsrRequest asynchronously to all brokers
> in the cluster, there will always be a period of time when consumer can not
> fetch data for the partition during the leadership change. Thus it seems
> more like a broker-side performance issue instead of client-side
> correctness issue. My gut feel is that it is not causing a much a problem
> as the problem to be fixed in this KIP. And if we were to address it, we
> probably need to make change in the broker side, e.g. with prioritized
> queue for controller-related requests, which may be kind of orthogonal to
> this KIP. I am not very sure it will be easier to address it with the
> change in this KIP. Do you have any recommendation?
>
> Thanks,
> Dong
>
>
> On Mon, Dec 11, 2017 at 1:51 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > Thanks for the reply.
> >
> > My suggestion of forcing the metadata refresh from the controller may not
> > work in general since the cached controller could be outdated too. The
> > general problem is that if a consumer's metadata is outdated, it may get
> > stuck with the old leader for a long time. We can address the issue of
> > detecting outdated metadata in a separate KIP in the future if you didn't
> > intend to address it in this KIP.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Sat, Dec 9, 2017 at 10:12 PM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks much for your comments. Given that client needs to de-serialize
> > the
> > > metadata anyway, the extra overhead of checking the per-partition
> version
> > > for every partition should not be a big concern. Thus it makes sense to
> > use
> > > leader epoch as the per-partition version instead of creating a global
> > > metadata version. I will update the KIP to do that.
> > >
> > > Regarding the detection of outdated metadata, I think it is possible to
> > > ensure that client gets latest metadata by fetching from controller.
> Note
> > > that this requires extra logic in the controller such that controller
> > > updates metadata directly in memory without requiring
> > > UpdateMetadataRequest. But I am not sure the main motivation of this at
> > > this moment. But this makes controller more like a bottleneck in the
> > > cluster which we probably want to avoid.
> > >
> > > I think we can probably keep the current way of ensuring metadata
> > > freshness. Currently client will be forced to refresh metadata if
> broker
> > > returns error (e.g. NotLeaderForPartition) due to outdated metadata or
> if
> > > the metadata does not contain the partition that the client needs. In
> the
> > > future, as you previously suggested, we can include per-partition
> > > leaderEpoch in the FetchRequest/ProduceRequest such that broker can
> > return
> > > error if the epoch is smaller than cached epoch in the broker. Given
> that
> > > this adds more complexity to Kafka, I think we can probably think about
> > > that leader when we have a specific use-case or problem to solve with
> > > up-to-date metadata. Does this sound OK?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > >
> > > On Fri, Dec 8, 2017 at 3:53 PM, Jun Rao  wrote:
> > >
> > > > Hi, Dong,
> > > >
> > > > Thanks for the reply. A few more points below.
> > > >
> > > > For dealing with how to prevent a consumer switching from a new
> leader
> > to
> > > > an old leader, you suggestion that 

[GitHub] kafka pull request #4315: KAFKA-6150: KIP-204 part III; Change repartition t...

2017-12-11 Thread guozhangwang
GitHub user guozhangwang opened a pull request:

https://github.com/apache/kafka/pull/4315

KAFKA-6150: KIP-204 part III; Change repartition topic segment size and ms

1. Create default internal topic configs in StreamsConfig, especially for 
repartition topics change the segment size and time to smaller value.
2. Consolidate the default internal topic settings to InternalTopicManager 
and simplify InternalTopicConfig correspondingly.
3. Add an integration test for purging data.
4. MINOR: change TopologyBuilderException to IllegalStateException in 
StreamPartitionAssignor (part of 
https://issues.apache.org/jira/browse/KAFKA-5660).

Here are a few public facing APIs that get added:

1. AbstractConfig#originalsWithPrefix(String prefix, boolean strip).
2. KafkaStreams constructor with Time object for convienent mocking in 
tests.

Will update KIP-204 accordingly if people re-votes these changes.

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka K6150-segment-size

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4315.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4315


commit 7f394817cc0a7a5606ee2cc1bae0d3e271c307b9
Author: Matthias J. Sax 
Date:   2017-11-21T02:17:57Z

KAFKA-4857: Replace StreamsKafkaClient with AdminClient in Kafka Streams

commit 49e832c297b93c432cdbffddca5635dbdecfc565
Author: Matthias J. Sax 
Date:   2017-12-04T19:36:29Z

Github comments
 - fixed parameter passing for `retries`
 - added couple of tests

commit ad7ccde56910a0b52825470fdd6a54339358445d
Author: Matthias J. Sax 
Date:   2017-12-04T21:01:46Z

rebased

commit bfd3937666ecc82e7584083fd1f17d0bacba8962
Author: Matthias J. Sax 
Date:   2017-12-05T05:38:36Z

fixed system test error message

commit 81b17ba0b60e9d11cd6a23d4f91786d499869393
Author: Guozhang Wang 
Date:   2017-12-07T02:53:33Z

Merge branch 'kafka-4857-admit-client' of https://github.com/mjsax/kafka 
into K6150-segment-size

commit 3c286b94e2d698ab41c90668b2b9ada8212ab2d8
Author: Guozhang Wang 
Date:   2017-12-08T00:14:04Z

add internal topic types

commit 6c223b89c0b81769930c86390de7a8f745e71780
Author: Guozhang Wang 
Date:   2017-12-08T02:30:12Z

use default props for internal topics

commit 177d11a400f1574f7c1fbac1e233493e3d43b944
Author: Guozhang Wang 
Date:   2017-12-11T17:36:11Z

Merge branch 'trunk' of https://git-wip-us.apache.org/repos/asf/kafka into 
K6150-segment-size

commit ef57e80633bd39335bc184bed59d55179d18b69b
Author: Guozhang Wang 
Date:   2017-12-12T01:20:55Z

add integration test for purging repartition topic




---


[jira] [Reopened] (KAFKA-2319) After controlled shutdown: IllegalStateException: Kafka scheduler has not been started

2017-12-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-2319:
--

> After controlled shutdown: IllegalStateException: Kafka scheduler has not 
> been started
> --
>
> Key: KAFKA-2319
> URL: https://issues.apache.org/jira/browse/KAFKA-2319
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.1
>Reporter: Jason Rosenberg
>
> Running 0.8.2.1, just saw this today at the end of a controlled shutdown.  It 
> doesn't happen every time, but I've seen it several times:
> {code}
> 2015-07-07 18:54:28,424  INFO [Thread-4] server.KafkaServer - [Kafka Server 
> 99], Controlled shutdown succeeded
> 2015-07-07 18:54:28,425  INFO [Thread-4] network.SocketServer - [Socket 
> Server on Broker 99], Shutting down
> 2015-07-07 18:54:28,435  INFO [Thread-4] network.SocketServer - [Socket 
> Server on Broker 99], Shutdown completed
> 2015-07-07 18:54:28,435  INFO [Thread-4] server.KafkaRequestHandlerPool - 
> [Kafka Request Handler on Broker 99], shutting down
> 2015-07-07 18:54:28,444  INFO [Thread-4] server.KafkaRequestHandlerPool - 
> [Kafka Request Handler on Broker 99], shut down completely
> 2015-07-07 18:54:28,649  INFO [Thread-4] server.ReplicaManager - [Replica 
> Manager on Broker 99]: Shut down
> 2015-07-07 18:54:28,649  INFO [Thread-4] server.ReplicaFetcherManager - 
> [ReplicaFetcherManager on broker 99] shutting down
> 2015-07-07 18:54:28,650  INFO [Thread-4] server.ReplicaFetcherThread - 
> [ReplicaFetcherThread-0-95], Shutting down
> 2015-07-07 18:54:28,750  INFO [Thread-4] server.ReplicaFetcherThread - 
> [ReplicaFetcherThread-0-95], Shutdown completed
> 2015-07-07 18:54:28,750  INFO [ReplicaFetcherThread-0-95] 
> server.ReplicaFetcherThread - [ReplicaFetcherThread-0-95], Stopped
> 2015-07-07 18:54:28,750  INFO [Thread-4] server.ReplicaFetcherThread - 
> [ReplicaFetcherThread-0-98], Shutting down
> 2015-07-07 18:54:28,791  INFO [Thread-4] server.ReplicaFetcherThread - 
> [ReplicaFetcherThread-0-98], Shutdown completed
> 2015-07-07 18:54:28,791  INFO [ReplicaFetcherThread-0-98] 
> server.ReplicaFetcherThread - [ReplicaFetcherThread-0-98], Stopped
> 2015-07-07 18:54:28,791  INFO [Thread-4] server.ReplicaFetcherManager - 
> [ReplicaFetcherManager on broker 99] shutdown completed
> 2015-07-07 18:54:28,819  INFO [Thread-4] server.ReplicaManager - [Replica 
> Manager on Broker 99]: Shut down completely
> 2015-07-07 18:54:28,826  INFO [Thread-4] log.LogManager - Shutting down.
> 2015-07-07 18:54:30,459  INFO [Thread-4] log.LogManager - Shutdown complete.
> 2015-07-07 18:54:30,463  WARN [Thread-4] utils.Utils$ - Kafka scheduler has 
> not been started
> java.lang.IllegalStateException: Kafka scheduler has not been started
> at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
> at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
> at 
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
> at 
> kafka.controller.KafkaController.shutdown(KafkaController.scala:664)
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$8.apply$mcV$sp(KafkaServer.scala:285)
> at kafka.utils.Utils$.swallow(Utils.scala:172)
> at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
> at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
> at kafka.utils.Logging$class.swallow(Logging.scala:94)
> at kafka.utils.Utils$.swallow(Utils.scala:45)
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:285)
>  ...
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-238: Expose Kafka cluster ID in Connect REST API

2017-12-11 Thread Ted Yu
I agree the formation in the pull request is the cleanest.

Thanks

On Mon, Dec 11, 2017 at 5:05 PM, Ewen Cheslack-Postava 
wrote:

> I did, but it doesn't seem to gain much. In order to still avoid having
> these intermediate states, you'd still need a latch and then to block any
> calls to the root resource until you could connect. It would allow starting
> up the rest of the worker, but if it's just going to fail and put the
> worker into a bad state anyway, that doesn't seem to help much.
>
> The alternative of just returning incomplete info didn't seem worth the
> hassle for users since if you can't connect to the cluster to get the
> cluster ID, none of the other APIs would be useful either (you're not going
> to be able to write new connector configs, asking for connector state will
> give you empty data since you wouldn't be able to load the configs or
> status topics, etc).
>
> -Ewen
>
> On Mon, Dec 11, 2017 at 4:35 PM, Ted Yu  wrote:
>
> > Looks good overall.
> >
> > Currently lookupKafkaClusterId() is called synchronously. Have you
> > considered making the call asynchronous (normally the GET / request comes
> > sometime after worker start) ?
> >
> > Thanks
> >
> > On Mon, Dec 11, 2017 at 3:40 PM, Ewen Cheslack-Postava <
> e...@confluent.io>
> > wrote:
> >
> > > I'd like to start discussion on a simple KIP to expose Kafka cluster ID
> > > info in the Connect REST API:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 238%3A+Expose+Kafka+cluster+ID+in+Connect+REST+API
> > >
> > > Hopefully straightforward, though there are some details on how this
> > > affects startup behavior that might warrant discussion.
> > >
> > > -Ewen
> > >
> >
>


Re: [DISCUSS] KIP-238: Expose Kafka cluster ID in Connect REST API

2017-12-11 Thread Ewen Cheslack-Postava
I did, but it doesn't seem to gain much. In order to still avoid having
these intermediate states, you'd still need a latch and then to block any
calls to the root resource until you could connect. It would allow starting
up the rest of the worker, but if it's just going to fail and put the
worker into a bad state anyway, that doesn't seem to help much.

The alternative of just returning incomplete info didn't seem worth the
hassle for users since if you can't connect to the cluster to get the
cluster ID, none of the other APIs would be useful either (you're not going
to be able to write new connector configs, asking for connector state will
give you empty data since you wouldn't be able to load the configs or
status topics, etc).

-Ewen

On Mon, Dec 11, 2017 at 4:35 PM, Ted Yu  wrote:

> Looks good overall.
>
> Currently lookupKafkaClusterId() is called synchronously. Have you
> considered making the call asynchronous (normally the GET / request comes
> sometime after worker start) ?
>
> Thanks
>
> On Mon, Dec 11, 2017 at 3:40 PM, Ewen Cheslack-Postava 
> wrote:
>
> > I'd like to start discussion on a simple KIP to expose Kafka cluster ID
> > info in the Connect REST API:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 238%3A+Expose+Kafka+cluster+ID+in+Connect+REST+API
> >
> > Hopefully straightforward, though there are some details on how this
> > affects startup behavior that might warrant discussion.
> >
> > -Ewen
> >
>


Re: [DISCUSS] KIP-238: Expose Kafka cluster ID in Connect REST API

2017-12-11 Thread Ted Yu
Looks good overall.

Currently lookupKafkaClusterId() is called synchronously. Have you
considered making the call asynchronous (normally the GET / request comes
sometime after worker start) ?

Thanks

On Mon, Dec 11, 2017 at 3:40 PM, Ewen Cheslack-Postava 
wrote:

> I'd like to start discussion on a simple KIP to expose Kafka cluster ID
> info in the Connect REST API:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 238%3A+Expose+Kafka+cluster+ID+in+Connect+REST+API
>
> Hopefully straightforward, though there are some details on how this
> affects startup behavior that might warrant discussion.
>
> -Ewen
>


[jira] [Resolved] (KAFKA-6341) 'networkThreadTimeNanos' in KafkaChannel is not thread safe

2017-12-11 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-6341.
---
Resolution: Not A Bug

> 'networkThreadTimeNanos' in KafkaChannel is not thread safe
> ---
>
> Key: KAFKA-6341
> URL: https://issues.apache.org/jira/browse/KAFKA-6341
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 1.0.0
>Reporter: huxihx
>
> `networkThreadTimeNanos` in KafkaChannel is of primitive long type which is 
> not thread safe. Multiple Processor threads could access(read and write) this 
> variable at the same time. Since JVM spec does not guarantee of the atomic 
> 64-bit operations against long/double types, it's safer to employ AtomicLong 
> instead of the naive long type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6347) Starting offset breach based log segment deletion never considers active segment

2017-12-11 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6347:


 Summary: Starting offset breach based log segment deletion never 
considers active segment
 Key: KAFKA-6347
 URL: https://issues.apache.org/jira/browse/KAFKA-6347
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Guozhang Wang


This observation is related to KIP-107: in 
{{Log#deleteLogStartOffsetBreachedSegments}}, we designed the predicate to 
enforce that the current segment can only be considered for deletion, when it 
1) has the next segment, 2) the next segment's starting offset is no larger 
than the log start offset.

{code}
def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) =
  nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
{code}

This means that, the current active segment would never be considered for 
purging. Normally this is OK given that we will eventually roll out a new 
segment. However, with the default size of segment of 1GB it means that with 
the purge data API we are in the worst case not being able to purge the data 
for up to 1GB per topic partition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-222 - Add "describe consumer group" to KafkaAdminClient

2017-12-11 Thread Jason Gustafson
Hi Jorge,

Kafka group management is actually more general than consumer groups (e.g.
there are kafka connect groups). If we are adding these APIs, I would
suggest we consider the more general protocol and how to expose
group-protocol-specific metadata. For example, it might be reasonable to
have both an API to access to the low-level bytes as well as some
higher-level convenience APIs for accessing consumer groups.

Thanks,
Jason

On Mon, Dec 4, 2017 at 4:07 PM, Matthias J. Sax 
wrote:

> Jorge,
>
> is there any update regarding this KIP?
>
>
> -Matthias
>
> On 11/17/17 9:14 AM, Guozhang Wang wrote:
> > Hello Jorge,
> >
> > I made a pass over the wiki, and here are a few comments:
> >
> > 1. First, regarding to Tom's comment #2 above, I think if we are only
> going
> > to include the String groupId. Then it is Okay to keep as a String than
> > using a new wrapper class. However, I think we could include the
> > protocol_type returned from the ListGroupsResponse along with the
> groupId.
> > This is a very useful information to tell which consumer groups are from
> > Connect, which ones are from Streams, which ones are user-customized etc.
> > With this, it is reasonable to keep a wrapper class.
> >
> > 2. In ConsumerDescription, could we also add the state, protocol_type
> > (these two are form DescribeGroupResponse), and the Node coordinator
> (this
> > may be returned from the AdminClient itself) as well? This is also for
> > information consistency with the old client (note that protocol_type was
> > called assignment_strategy there).
> >
> > 3. With 1) / 2) above, maybe we can rename "ConsumerGroupListing" to
> > "ConsumerGroupSummary" and make "ConsumerGroupDescription" an extended
> > class of the former with the additional fields?
> >
> >
> >
> > Guozhang
> >
> >
> > On Tue, Nov 7, 2017 at 2:13 AM, Jorge Esteban Quilcate Otoya <
> > quilcate.jo...@gmail.com> wrote:
> >
> >> Hi Tom,
> >>
> >> 1. You're right. I've updated the KIP accordingly.
> >> 2. Yes, I have add it to keep consistency, but I'd like to know what
> others
> >> think about this too.
> >>
> >> Cheers,
> >> Jorge.
> >>
> >> El mar., 7 nov. 2017 a las 9:29, Tom Bentley ()
> >> escribió:
> >>
> >>> Hi again Jorge,
> >>>
> >>> A couple of minor points:
> >>>
> >>> 1. ConsumerGroupDescription has the member `name`, but everywhere else
> >> that
> >>> I've seen the term "group id" is used, so perhaps calling it "id" or
> >>> "groupId" would be more consistent.
> >>> 2. I think you've added ConsumerGroupListing for consistency with
> >>> TopicListing. For topics it makes sense because at well as the name
> there
> >>> is whether the topic is internal. For consumer groups, though there is
> >> just
> >>> the name and having a separate ConsumerGroupListing seems like it
> doesn't
> >>> add very much, and would mostly get in the way when using the API. I
> >> would
> >>> be interested in what others thought about this.
> >>>
> >>> Cheers,
> >>>
> >>> Tom
> >>>
> >>> On 6 November 2017 at 22:16, Jorge Esteban Quilcate Otoya <
> >>> quilcate.jo...@gmail.com> wrote:
> >>>
>  Thanks for the feedback!
> 
>  @Ted Yu: Links added.
> 
>  KIP updated. Changes:
> 
>  * `#listConsumerGroups(ListConsumerGroupsOptions options)` added to
> >> the
>  API.
>  * `DescribeConsumerGroupResult` and `ConsumerGroupDescription` classes
>  described.
> 
>  Cheers,
>  Jorge.
> 
> 
> 
> 
>  El lun., 6 nov. 2017 a las 20:28, Guozhang Wang ( >)
>  escribió:
> 
> > Hi Matthias,
> >
> > You meant "list groups" I think?
> >
> > Guozhang
> >
> > On Mon, Nov 6, 2017 at 11:17 AM, Matthias J. Sax <
> >>> matth...@confluent.io>
> > wrote:
> >
> >> The main goal of this KIP is to enable decoupling StreamsResetter
> >>> from
> >> core module. For this case (ie, using AdminClient within
> >> StreamsResetter) we get the group.id from the user as command line
> >> argument. Thus, I think the KIP is useful without "describe group"
> >> command to.
> >>
> >> I am happy to include "describe group" command in the KIP. Just
> >> want
> >>> to
> >> point out, that there is no reason to insist on it IMHO.
> >>
> >>
> >> -Matthias
> >>
> >> On 11/6/17 7:06 PM, Guozhang Wang wrote:
> >>> A quick question: I think we do not yet have the `list consumer
>  groups`
> >>> func as in the old AdminClient. Without this `describe group`
> >> given
>  the
> >>> group id would not be very useful. Could you include this as well
> >>> in
> > your
> >>> KIP? More specifically, you can look at
> >> kafka.admin.AdminClientfor
>  more
> >>> details on the APIs.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>> On Mon, Nov 6, 2017 at 7:22 AM, Ted Yu 
> >>> wrote:
> >>>
>  Please fill out Discussion 

答复: 答复: [VOTE] KIP-223 - Add per-topic min lead and per-partition lead metrics to KafkaConsumer

2017-12-11 Thread Hu Xi
Hi Mickael Maison,


Thanks for the comments, but I think it deserves another KIP since this one 
mainly focuses on metrics things. The way to calculate the lag in 
ConsumerGroupCommand is seemingly different from what metrics do, so a new KIP 
might be required. What do you think?


huxihx



发件人: Mickael Maison 
发送时间: 2017年12月11日 18:17
收件人: dev@kafka.apache.org
主题: Re: 答复: [VOTE] KIP-223 - Add per-topic min lead and per-partition lead 
metrics to KafkaConsumer

Sorry to only raise this now, but should we also update the
kafka-consumer-groups tool to display the start offset (and possibly
the lead) ?

Apart from that I'm +1 (non binding)
Thanks

On Mon, Dec 11, 2017 at 4:19 AM, Guozhang Wang  wrote:
> The by-laws ask for 72 hours only, since the starting of the vote, and
> since you have three binding votes you can close this voting now.
>
> Please conclude by a summary of the voting status including non-binding and
> binding votes, thanks.
>
>
> Guozhang
>
> On Sun, Dec 10, 2017 at 8:10 PM, Hu Xi  wrote:
>
>> Hi all,  Would we safely accept this KIP since three binding votes have
>> already been collected (from Jun, Guozhang and Becket)?
>>
>>
>> 
>> 发件人: Guozhang Wang 
>> 发送时间: 2017年12月6日 22:40
>> 收件人: dev@kafka.apache.org
>> 主题: Re: [VOTE] KIP-223 - Add per-topic min lead and per-partition lead
>> metrics to KafkaConsumer
>>
>> Hi Hu Xi,
>>
>> As I said before, it is only a clarification question for its internal
>> implementation; it is not related to the public interfaces.
>>
>>
>> Guozhang
>>
>> On Wed, Dec 6, 2017 at 12:34 AM, Hu Xi  wrote:
>>
>> > Hi Guozhang,
>> >
>> >
>> > Correct me if I am wrong. Of course I could construct the sensor where
>> > per-partition metrics reside as a child of the client-level
>> > "records-lead-min", but it seems that whether doing that way takes no
>> > effects on what this KIP gonna do,  so is it a must?
>> >
>> > 
>> > 发件人: Guozhang Wang 
>> > 发送时间: 2017年12月6日 15:05
>> > 收件人: dev@kafka.apache.org
>> > 主题: Re: [VOTE] KIP-223 - Add per-topic min lead and per-partition lead
>> > metrics to KafkaConsumer
>> >
>> > Hello Xi,
>> >
>> > You can see that in o.a.k.common.metrics.Sensor, we allow constructors to
>> > pass in one or more "parent" Sensors of the constructed Sensor, behind
>> the
>> > scene when a child sensor's metrics have been updated, the updates will
>> be
>> > propagated all the way up to its parents and ancestors (you can checkout
>> > the source code for its impl). On different clients we have been using
>> this
>> > to build many hierarchical sensors, like per-dest-broker metrics v.s.
>> > all-dest-broker metrics on selector, etc.
>> >
>> > My understanding is that the cross-all-partitions "records-lead-min" will
>> > be constructed as the parent of all the per-partition "records-lead-min",
>> > is that true?
>> >
>> >
>> > Guozhang
>> >
>> > On Mon, Dec 4, 2017 at 11:26 PM, Hu Xi  wrote:
>> >
>> > > Guozhang,
>> > >
>> > >
>> > > Thanks for the vote and comments. I am not sure if I fully understand
>> the
>> > > parent metrics here. This KIP will introduce a client-level metric
>> named
>> > > 'records-lead-min' and three per-partition metrics tagged with
>> > > topic Is it the child-parent relationship you mean?
>> > >
>> > >
>> > > 
>> > > 发件人: Guozhang Wang 
>> > > 发送时间: 2017年12月5日 15:16
>> > > 收件人: dev@kafka.apache.org
>> > > 主题: Re: [VOTE] KIP-223 - Add per-topic min lead and per-partition lead
>> > > metrics to KafkaConsumer
>> > >
>> > > Thanks Hu Xi,
>> > >
>> > > I made a pass over the KIP and it lgtm. +1.
>> > >
>> > > Just a clarification question: for the cross-partition
>> "records-lead-min"
>> > > metric, would that be implemented as a parent metric of the
>> per-partition
>> > > metrics?
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Mon, Dec 4, 2017 at 3:07 PM, Dong Lin  wrote:
>> > >
>> > > > +1 (non-binding)
>> > > >
>> > > > On Wed, Nov 29, 2017 at 7:05 PM, Hu Xi  wrote:
>> > > >
>> > > > > Hi all,
>> > > > >
>> > > > > As I didn't see any further discussion around this KIP, I'd like to
>> > > start
>> > > > > voting.
>> > > > >
>> > > > > KIP documentation:
>> > > > >
>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > 223+-+Add+per-topic+min+lead+and+per-partition+lead+
>> > > > > metrics+to+KafkaConsumer
>> > > > >
>> > > > >
>> > > > >
>> > > > > Cheers,
>> > > > >
>> > > > > huxihx
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> -- Guozhang


Re: [VOTE] KIP-225 - Use tags for consumer “records.lag” metrics

2017-12-11 Thread Jason Gustafson
+1. Thanks for the KIP.

On Mon, Dec 11, 2017 at 1:54 AM, charly molter 
wrote:

> Hi,
> The KIP has been updated. As it has change should I restart the vote?
>
> In any case I'm still missing one binding vote if anyone wants to help.
> Thanks!
>
> On Wed, Dec 6, 2017 at 6:13 PM, charly molter 
> wrote:
>
> > Sounds good I'll update the KIP
> >
> > On Wed, Dec 6, 2017 at 6:04 PM, Becket Qin  wrote:
> >
> >> Hi Charly,
> >>
> >> Personally I prefer emitting both and deprecate old one. This does not
> >> block on the 2.0 release and we don't need to worry about more users
> >> picking up the old metric in 1.1 release.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Tue, Dec 5, 2017 at 4:08 AM, charly molter 
> >> wrote:
> >>
> >> > Thanks Jun and Becket!
> >> >
> >> > I think your point about 1.0 vs 2.0 makes sense I can update the KIP
> to
> >> > reflect this.
> >> >
> >> > What's the process for 2.0 contributions as I can see that trunk is
> 1.1
> >> and
> >> > no 2.x branch?
> >> >
> >> > Here's what I can do:
> >> > - Not write the code change until trunk moves to 2.0.
> >> > - Write the change but leave the PR open until we start working on
> 2.0.
> >> > - Stall this KIP until 2.0 development starts (IIRC it's pretty soon).
> >> > - Do it in a backward compatible way (publish both sets of metrics)
> and
> >> > open a Jira tagged on 2.0 to remove the old metrics.
> >> >
> >> > Let me know what's the right way to go.
> >> >
> >> > Thanks!
> >> >
> >> >
> >> > On Tue, Dec 5, 2017 at 12:45 AM, Becket Qin 
> >> wrote:
> >> >
> >> > > Thanks for the KIP, Charly.
> >> > >
> >> > > +1. The proposal looks good to me. I agree with Jun that it is
> better
> >> to
> >> > > make the metrics consistent with other metrics. That being said,
> >> arguably
> >> > > this is a backwards incompatible change. Since we are at 1.0,
> >> backwards
> >> > > incompatible changes are supposed to be in 2.0. Not sure if that is
> >> the
> >> > > plan or not.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jiangjie (Becket) Qin
> >> > >
> >> > > On Mon, Dec 4, 2017 at 4:20 PM, Jun Rao  wrote:
> >> > >
> >> > > > Hi, Jiangjie,
> >> > > >
> >> > > > Since you proposed the original KIP-92, do you want to see if this
> >> KIP
> >> > > > makes sense?
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Jun
> >> > > >
> >> > > > On Wed, Nov 22, 2017 at 2:48 AM, charly molter <
> >> > charly.mol...@gmail.com>
> >> > > > wrote:
> >> > > >
> >> > > > > Hi,
> >> > > > >
> >> > > > > I would like to start the voting thread for KIP-225.
> >> > > > > This KIP proposes to correct some lag metrics emitted by the
> >> > consumer.
> >> > > > >
> >> > > > > The KIP wiki is here:
> >> > > > > https://cwiki.apache.org/confluence/x/uaBzB
> >> > > > >
> >> > > > > The discussion thread is here:
> >> > > > > http://search-hadoop.com/m/Kafka/uyzND1F33uL19AYx/threaded
> >> > > > >
> >> > > > > Also could someone assign me to this Jira: KAFKA-5890
> >> > > > > 
> >> > > > >
> >> > > > > Thanks,
> >> > > > > --
> >> > > > > Charly Molter
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > Charly Molter
> >> >
> >>
> >
> >
> >
> > --
> > Charly Molter
> >
>
>
>
> --
> Charly Molter
>


Re: [DISCUSS] KIP-229: DeleteGroups API

2017-12-11 Thread Jason Gustafson
Hi Vahid,

Seems reasonable to me. A few questions.

1. What error code do we use when the group is not empty?
2. Do we need the capability to delete offsets individually for a
partition? I ask because the offset tool lets you set the offset for
partitions individually, so it might be natural to expect you could also
delete them individually. We could leave this as potential future work if
the use case is not too compelling.
3. Looks like you've added this API to the internal AdminClient. Wanted to
check if that was intentional. Since the public AdminClient doesn't support
any of the group admin capabilities, I think it's reasonable to leave it
out of this KIP.

Thanks,
Jason

On Mon, Dec 11, 2017 at 2:19 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> If there are no additional feedback on this KIP, I'll start a vote in a
> couple of days.
>
> Thanks.
> --Vahid
>
>
>
> From:   Vahid S Hashemian/Silicon Valley/IBM
> To: dev@kafka.apache.org
> Date:   11/29/2017 11:15 AM
> Subject:Re: [DISCUSS] KIP-229: DeleteGroups API
>
>
> Hi Dong,
>
> Thanks a lot for your feedback.
> I updated the KIP and included those fields and also made a note of the
> corresponding AdminClient API that will be created to support deleting
> consumer groups.
> I hope the updates address your suggestions.
>
> Cheers!
>
> --Vahid
>
>
>
>
>
> From:   Dong Lin 
> To: dev@kafka.apache.org
> Date:   11/28/2017 11:16 PM
> Subject:Re: [DISCUSS] KIP-229: DeleteGroups API
>
>
>
> Hey Vahid,
>
> Thanks for the KIP! This is certainly a useful one and users have been
> asking about the ability to delete group from the Kafka offset topic in my
> past experience.
>
> It seems that the protocol of the new request/response should probably
> include more fields fields. For example, it may be useful to include
> throttle_time_ms field and a request level error code in
> DeleteGroupsResponse. The request level error code can possibly show error
> such as NOT_COORDINATOR_FOR_GROUP.
>
> Also, user may want to use this feature programmatically. Do you think we
> should add a corresponding API in AminClient to delete groups? If so, can
> you specify the new AdminClient API in the KIP?
>
> Thanks,
> Dong
>
>
> On Tue, Nov 28, 2017 at 4:03 PM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com> wrote:
>
> > Hi everyone,
> >
> > I started KIP-229 and proposed a consumer group deletion API for
> > Kafka-based group management to address KAFKA-6275:
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> apache.org_confluence_display_KAFKA_KIP-2D=DwIBaQ=jf_
> iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> kjJc7uSVcviKUc=97ut7r-oek0jZuRXs-VmWvs_86JChFctd2xFoP4Y2tI=ZT_
> XH0rdqPD82T3oMqwcsAi19zCXhq9Zkh9bfffSYLk=
>
> > 229%3A+DeleteGroups+API
> > Your feedback and suggestions are welcome!
> >
> > Thanks.
> > --Vahid
> >
> >
> >
>
>
>
>
>
>


[DISCUSS] KIP-238: Expose Kafka cluster ID in Connect REST API

2017-12-11 Thread Ewen Cheslack-Postava
I'd like to start discussion on a simple KIP to expose Kafka cluster ID
info in the Connect REST API:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-238%3A+Expose+Kafka+cluster+ID+in+Connect+REST+API

Hopefully straightforward, though there are some details on how this
affects startup behavior that might warrant discussion.

-Ewen


[GitHub] kafka pull request #4314: KAFKA-6331: Expose Kafka cluster ID in Connect RES...

2017-12-11 Thread ewencp
GitHub user ewencp opened a pull request:

https://github.com/apache/kafka/pull/4314

KAFKA-6331: Expose Kafka cluster ID in Connect REST API (KIP-238)

*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*

Simple unit tests sufficiently exercise the behavior. In fact, this 
addition increases coverage since `RootResource` was not previously unit tested.

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ewencp/kafka 
expose-kafka-cluster-id-in-connect-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4314.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4314


commit b6574388cade08777b247133cae48d61376b8800
Author: Ewen Cheslack-Postava 
Date:   2017-12-11T23:12:51Z

KAFKA-6331: Expose Kafka cluster ID in Connect REST API (KIP-238)




---


[GitHub] kafka pull request #4313: MINOR: broker down for significant amt of time sys...

2017-12-11 Thread bbejeck
GitHub user bbejeck opened a pull request:

https://github.com/apache/kafka/pull/4313

MINOR: broker down for significant amt of time system test

System test where a broker is offline more than the configured timeouts.  
In this case:
- Max poll interval set to 45 secs
- Retries set to 2
- Request timeout set to 15 seconds
- Max block ms set to 30 seconds

The broker was taken off-line for 70 seconds or more than double request 
timeout * num retries

[passing system test 
results](uhttp://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2017-12-11--001.1513034559--bbejeck--KSTREAMS_1179_broker_down_for_significant_amt_of_time--6ab4802/report.html)

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bbejeck/kafka 
KSTREAMS_1179_broker_down_for_significant_amt_of_time

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4313.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4313


commit f21702a454122ec1f04af010de873a24ee89d18a
Author: Bill Bejeck 
Date:   2017-12-08T21:14:47Z

MINOR: add system test for streams resilience to broker outages

commit ddaa775ec18db333e81b35d618f4896277b5c25c
Author: Bill Bejeck 
Date:   2017-12-08T21:34:43Z

fix parameters for test

commit 39042ad809d34b216b504748f01350ceea6257dc
Author: Bill Bejeck 
Date:   2017-12-08T21:55:55Z

fix parameters for test in service

commit 7a493362fe63666db782c6e101cbc2f206a66e31
Author: Bill Bejeck 
Date:   2017-12-08T21:57:42Z

name parameters according to format

commit 0a5d4db0707cdaab6897055ff2ccae986678de78
Author: Bill Bejeck 
Date:   2017-12-08T22:24:17Z

update start message to expected text for framework

commit 609c96328e2dc2a6e798066782a13f23094f2df2
Author: Bill Bejeck 
Date:   2017-12-11T18:40:56Z

changed to use correct methods to start and stop kafka node

commit 950c4aba80a6fa500e519d52ca0b9edabdfad97b
Author: Bill Bejeck 
Date:   2017-12-11T19:03:04Z

fix references to methods

commit 3af19698d93b10403ede987fbd76c5435a7a4d88
Author: Bill Bejeck 
Date:   2017-12-11T19:23:50Z

moved methods to test method

commit 91847ea3344d99493e3c5812e649334b8e67eadf
Author: Bill Bejeck 
Date:   2017-12-11T19:55:15Z

fix args for starting test

commit 673cf8a86e9f97647dcaab29139f5deefcacd5fa
Author: Bill Bejeck 
Date:   2017-12-11T20:22:44Z

fix args for passing in None

commit 95b61a8344da27435a98ca0f9d6fa5fbb80197ec
Author: Bill Bejeck 
Date:   2017-12-11T21:20:39Z

reduced timeout and verify by using consumer

commit b9dea8cb31fce4665a320a9fecc38b049ba13f83
Author: Bill Bejeck 
Date:   2017-12-11T21:52:45Z

increase verify consumer session timeout

commit d77a6507b66ff40e0dea33b2c0a6ee493cd7d674
Author: Bill Bejeck 
Date:   2017-12-11T22:28:44Z

parse stdout file for success message

commit 46c5aaa91cba6bf4c456096064c522e045e34f0d
Author: Bill Bejeck 
Date:   2017-12-11T22:33:14Z

formatting fixes

commit b92456804af0b1e43254354395d18b79a15d2f43
Author: Bill Bejeck 
Date:   2017-12-11T22:34:52Z

more formatting

commit 8f5c9cc253803fa0341a2b6cffa350ad0e91ce45
Author: Bill Bejeck 
Date:   2017-12-11T22:39:16Z

fix times

commit 6ab4802cb7bcc46e95448d147bb708a84b6fe480
Author: Bill Bejeck 
Date:   2017-12-11T23:08:45Z

get rid of VerifiableConsumer, not using




---


Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-11 Thread Colin McCabe
On Mon, Dec 11, 2017, at 14:51, Becket Qin wrote:
> Hi Jun,
> 
> Yes, I agree avoiding reading the log segment is not the primary goal for
> this KIP. I brought this up because recently I saw a significant
> throughput
> impact when a broker is down for 20 - 30 min and rejoins a cluster. The
> bytes in rate could drop by 50% when that broker is trying to catch up
> with
> the leaders even in a big cluster (a single broker should not have such
> big
> impact on the entire cluster).

Hi Becket,

It sounds like the broker was fetching older data which wasn't in the
page cache?  That sounds like it could definitely have a negative impact
on the cluster.  It is a little troubling if the impact is a 50% drop in
throughput, though.

It's a little unclear how to mitigate this, since old data is definitely
not going to be in memory.  Maybe we need to work on making sure that
slow fetches going on by one fetcher do not slow down all the other
worker threads...?

> And some users also reported such cascading
> degradation, i.e. when one consumer lags behind, the other consumers will
> also start to lag behind. So I think addressing this is an important
> improvement. I will run some test and see if returning at index boundary
> to avoid the log scan would help address this issue. That being said, I
> agree that we don't have to address this issue in this KIP. I can submit
> another KIP later if avoiding the log segment scan helps.

Thanks, that's really interesting.

I agree that it might be better in a follow-on KIP.

Is the goal to improve the cold-cache case?  Maybe avoid looking at the
index file altogether (except for the initial setup)?  That would be a
nice improvement for consumers fetching big sequential chunks of
historic data.

regards,
Colin


> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> On Mon, Dec 11, 2017 at 1:06 PM, Dong Lin  wrote:
> 
> > Hey Colin,
> >
> > I went over the latest KIP wiki and have a few comments here.
> >
> > 1) The KIP says that client ID is a string if the session belongs to a
> > Kafka consumer. And it is a numerical follower Id if the session belongs to
> > a follower. Can we have a consistent type for the client Id?
> >
> > 2) "The numeric follower ID, if this fetch session belongs to a Kafka
> > broker". If the broker has multiple replica fetcher thread, do they all
> > have the same follower Id in teh leader broker?
> >
> > 3) One of the condition for evicting an existing session is that "The new
> > session belongs to a follower, and the existing session belongs to a
> > regular consumer". I am not sure the session from follower should also be
> > restricted by the newly added config. It seems that we will always create
> > lots for FetchRequest from follower brokers. Maybe the
> > "max.incremental.fetch.session.cache.slots" should only be applies if the
> > FetchRequest comes from a client consumer?
> >
> > 4) Not sure I fully understand how the "The last dirty sequence number" is
> > used. It is mentioned that "Let P1 have a last dirty sequence number of
> > 100, and P2 have a last dirty sequence number of 101. An incremental fetch
> > request with sequence number 100 will return information about both P1 and
> > P2." But would be the fetch offset for P2 in this case, if the last fetch
> > offset stored in the Fetch Session for P2 is associated with the last dirty
> > sequence number 101 for P2? My gut feel is that you would have to stored
> > the fetch offset for sequence number 100 for P2 as well. Did I miss
> > something here?
> >
> > Thanks,
> > Dong
> >
> > On Sun, Dec 10, 2017 at 11:15 PM, Becket Qin  wrote:
> >
> > > Hi Jun,
> > >
> > > I see. Yes, that makes sense. Are we going to do that only for the
> > fetches
> > > whose per partition fetch size cannot reach the first index entry after
> > the
> > > fetch position, or are we going to do that for any fetch? If we do that
> > for
> > > any fetch, then we will still need to read the actual log segment, which
> > > could be expensive if the data is no longer in the cache. This hurts
> > > performance if some fetches are on the old log segments.
> > >
> > > I took a quick look on the clusters we have. The idle topic ratio varies
> > > depending on the usage of the cluster. For our metric cluster and
> > database
> > > replication clusters almost all the topics are actively used. For
> > tracking
> > > clusters, ~70% topics have data coming in at different rate. For other
> > > clusters such as queuing and data deployment. There are more idle topics
> > > and the traffic is more bursty (I don't have the exact number here).
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Sun, Dec 10, 2017 at 10:17 PM, Colin McCabe 
> > wrote:
> > >
> > > > On Fri, Dec 8, 2017, at 16:56, Jun Rao wrote:
> > > > > Hi, Jiangjie,
> > > > >
> > > > > What I described is almost the same as yours. The only extra thing is
> > > to
> > > > > scan the log segment 

Re: [DISCUSS] KIP-229: DeleteGroups API

2017-12-11 Thread Vahid S Hashemian
If there are no additional feedback on this KIP, I'll start a vote in a 
couple of days.

Thanks.
--Vahid



From:   Vahid S Hashemian/Silicon Valley/IBM
To: dev@kafka.apache.org
Date:   11/29/2017 11:15 AM
Subject:Re: [DISCUSS] KIP-229: DeleteGroups API


Hi Dong,

Thanks a lot for your feedback.
I updated the KIP and included those fields and also made a note of the 
corresponding AdminClient API that will be created to support deleting 
consumer groups.
I hope the updates address your suggestions.

Cheers!

--Vahid





From:   Dong Lin 
To: dev@kafka.apache.org
Date:   11/28/2017 11:16 PM
Subject:Re: [DISCUSS] KIP-229: DeleteGroups API



Hey Vahid,

Thanks for the KIP! This is certainly a useful one and users have been
asking about the ability to delete group from the Kafka offset topic in my
past experience.

It seems that the protocol of the new request/response should probably
include more fields fields. For example, it may be useful to include
throttle_time_ms field and a request level error code in
DeleteGroupsResponse. The request level error code can possibly show error
such as NOT_COORDINATOR_FOR_GROUP.

Also, user may want to use this feature programmatically. Do you think we
should add a corresponding API in AminClient to delete groups? If so, can
you specify the new AdminClient API in the KIP?

Thanks,
Dong


On Tue, Nov 28, 2017 at 4:03 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi everyone,
>
> I started KIP-229 and proposed a consumer group deletion API for
> Kafka-based group management to address KAFKA-6275:
> 
https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=97ut7r-oek0jZuRXs-VmWvs_86JChFctd2xFoP4Y2tI=ZT_XH0rdqPD82T3oMqwcsAi19zCXhq9Zkh9bfffSYLk=

> 229%3A+DeleteGroups+API
> Your feedback and suggestions are welcome!
>
> Thanks.
> --Vahid
>
>
>







Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-11 Thread Colin McCabe
On Mon, Dec 11, 2017, at 13:06, Dong Lin wrote:
> Hey Colin,
> 
> I went over the latest KIP wiki and have a few comments here.
> 
> 1) The KIP says that client ID is a string if the session belongs to a
> Kafka consumer. And it is a numerical follower Id if the session belongs
> to a follower. 

Hi Dong,

Right.  The issue is that replicas are identified by integers, whereas
consumers are identified by strings.

> Can we have a consistent type for the client Id?

We could use a string for both, perhaps?  Theoretically, a consumer
could also be named "broker 0" though, right?  So it would not be unique
any more.  Not sure what the best approach is here... what do you think?

> 2) "The numeric follower ID, if this fetch session belongs to a Kafka
> broker". If the broker has multiple replica fetcher thread, do they all
> have the same follower Id in teh leader broker?

Yes, every session created by the same replica will have the same
replica ID.  The fetch session ID will be different for each thread,
though.

> 3) One of the condition for evicting an existing session is that "The new
> session belongs to a follower, and the existing session belongs to a
> regular consumer". I am not sure the session from follower should also be
> restricted by the newly added config. It seems that we will always create
> lots for FetchRequest from follower brokers. Maybe the
> "max.incremental.fetch.session.cache.slots" should only be applies if the
> FetchRequest comes from a client consumer?

Well, replicas may sometimes go down.  In that case, when they come back
up, they will create new fetch sessions.  So we need to be able to evict
the old fetch sessions from the cache.  In the course of normal
operation, though, replicas should always have incremental fetch
sessions, because they are prioritized over consumers.

> 
> 4) Not sure I fully understand how the "The last dirty sequence number"
> is
> used. It is mentioned that "Let P1 have a last dirty sequence number of
> 100, and P2 have a last dirty sequence number of 101. An incremental
> fetch
> request with sequence number 100 will return information about both P1
> and
> P2." But would be the fetch offset for P2 in this case, if the last fetch
> offset stored in the Fetch Session for P2 is associated with the last
> dirty
> sequence number 101 for P2? My gut feel is that you would have to stored
> the fetch offset for sequence number 100 for P2 as well. Did I miss
> something here?

It's OK to return the latest available data.  So if someone sends a
fetch request with seqno 100, and then you get new data, and then the
fetch request with seqno 100 gets resent, it's OK to include the new
data in the second response, even though it wasn't in the first
response.  We are NOT implementing snapshots here, or anything like
that.  We are just trying to guard against the "lost updates" case. 
Also, keep in mind that the fetch offset is not updated until it is sent
in a fetch request, just like now.

best,
Colin

> 
> Thanks,
> Dong
> 
> On Sun, Dec 10, 2017 at 11:15 PM, Becket Qin 
> wrote:
> 
> > Hi Jun,
> >
> > I see. Yes, that makes sense. Are we going to do that only for the fetches
> > whose per partition fetch size cannot reach the first index entry after the
> > fetch position, or are we going to do that for any fetch? If we do that for
> > any fetch, then we will still need to read the actual log segment, which
> > could be expensive if the data is no longer in the cache. This hurts
> > performance if some fetches are on the old log segments.
> >
> > I took a quick look on the clusters we have. The idle topic ratio varies
> > depending on the usage of the cluster. For our metric cluster and database
> > replication clusters almost all the topics are actively used. For tracking
> > clusters, ~70% topics have data coming in at different rate. For other
> > clusters such as queuing and data deployment. There are more idle topics
> > and the traffic is more bursty (I don't have the exact number here).
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sun, Dec 10, 2017 at 10:17 PM, Colin McCabe  wrote:
> >
> > > On Fri, Dec 8, 2017, at 16:56, Jun Rao wrote:
> > > > Hi, Jiangjie,
> > > >
> > > > What I described is almost the same as yours. The only extra thing is
> > to
> > > > scan the log segment from the identified index entry a bit more to
> > find a
> > > > file position that ends at a message set boundary and is less than the
> > > > partition level fetch size. This way, we still preserve the current
> > > > semantic of not returning more bytes than fetch size unless there is a
> > > > single message set larger than the fetch size.
> > > >
> > > > In a typically cluster at LinkedIn, what's the percentage of idle
> > > > partitions?
> > >
> > > Yeah, that would be a great number to get.
> > >
> > > Of course, KIP-227 will also benefit partitions that are not completely
> > > idle.  For instance, a partition that's 

[jira] [Created] (KAFKA-6346) Consolidate multiple background async log operations

2017-12-11 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6346:


 Summary: Consolidate multiple background async log operations
 Key: KAFKA-6346
 URL: https://issues.apache.org/jira/browse/KAFKA-6346
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Guozhang Wang


On Kafka broker's log managers, today we trigger a lot of repeating and 
one-time async background tasks, including:

1. log deletion based on retention policy (repeating).
  1.2. async log segment deletion triggered from 1 above (one-time)
2. log deletion based on replica stopping / log directory swapping (repeating)
3. log compaction, i.e. so-called "log cleaner thread" (repeating)
4. dirty log flushing
5. log recovery offsets checkpointing.
6. log starting offsets checkpointing.

I suspect all these background tasks are silently eating our CPU idle time 
unnecessarily. We should consider 1) benchmark their impacts on CPU cost and 2) 
if impact is high, consider consolidating some of these tasks into fewer async 
background threads.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field

2017-12-11 Thread Dong Lin
Hey Jun,

Thanks for the comment. I am open to improve this KIP to address more
problems. I probably need more help in understanding what is the current
problem with consumer using outdated metadata and whether it is easier to
address it together with this KIP.

I agree that a consumer can potentially talk to old leader for a long time
even after this KIP. But after this KIP, the consumer probably should not
get OffetOutofRangeException and therefore will not cause offset rewind
issue. So the only problem is that consumer will not be able to fetch data
until it has updated metadata. It seems that this situation can only happen
if the broker is too slow in processing LeaderAndIsrRequest since otherwise
the consumer will be forced to update metadata due to
NotLeaderForPartitionException. So the problem we are having here is that
consumer will not be able to fetch data if some broker is too slow in
processing LeaderAndIsrRequest.

Because Kafka propagates LeaderAndIsrRequest asynchronously to all brokers
in the cluster, there will always be a period of time when consumer can not
fetch data for the partition during the leadership change. Thus it seems
more like a broker-side performance issue instead of client-side
correctness issue. My gut feel is that it is not causing a much a problem
as the problem to be fixed in this KIP. And if we were to address it, we
probably need to make change in the broker side, e.g. with prioritized
queue for controller-related requests, which may be kind of orthogonal to
this KIP. I am not very sure it will be easier to address it with the
change in this KIP. Do you have any recommendation?

Thanks,
Dong


On Mon, Dec 11, 2017 at 1:51 PM, Jun Rao  wrote:

> Hi, Dong,
>
> Thanks for the reply.
>
> My suggestion of forcing the metadata refresh from the controller may not
> work in general since the cached controller could be outdated too. The
> general problem is that if a consumer's metadata is outdated, it may get
> stuck with the old leader for a long time. We can address the issue of
> detecting outdated metadata in a separate KIP in the future if you didn't
> intend to address it in this KIP.
>
> Thanks,
>
> Jun
>
>
> On Sat, Dec 9, 2017 at 10:12 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks much for your comments. Given that client needs to de-serialize
> the
> > metadata anyway, the extra overhead of checking the per-partition version
> > for every partition should not be a big concern. Thus it makes sense to
> use
> > leader epoch as the per-partition version instead of creating a global
> > metadata version. I will update the KIP to do that.
> >
> > Regarding the detection of outdated metadata, I think it is possible to
> > ensure that client gets latest metadata by fetching from controller. Note
> > that this requires extra logic in the controller such that controller
> > updates metadata directly in memory without requiring
> > UpdateMetadataRequest. But I am not sure the main motivation of this at
> > this moment. But this makes controller more like a bottleneck in the
> > cluster which we probably want to avoid.
> >
> > I think we can probably keep the current way of ensuring metadata
> > freshness. Currently client will be forced to refresh metadata if broker
> > returns error (e.g. NotLeaderForPartition) due to outdated metadata or if
> > the metadata does not contain the partition that the client needs. In the
> > future, as you previously suggested, we can include per-partition
> > leaderEpoch in the FetchRequest/ProduceRequest such that broker can
> return
> > error if the epoch is smaller than cached epoch in the broker. Given that
> > this adds more complexity to Kafka, I think we can probably think about
> > that leader when we have a specific use-case or problem to solve with
> > up-to-date metadata. Does this sound OK?
> >
> > Thanks,
> > Dong
> >
> >
> >
> > On Fri, Dec 8, 2017 at 3:53 PM, Jun Rao  wrote:
> >
> > > Hi, Dong,
> > >
> > > Thanks for the reply. A few more points below.
> > >
> > > For dealing with how to prevent a consumer switching from a new leader
> to
> > > an old leader, you suggestion that refreshes metadata on consumer
> restart
> > > until it sees a metadata version >= the one associated with the offset
> > > works too, as long as we guarantee that the cached metadata versions on
> > the
> > > brokers only go up.
> > >
> > > The second discussion point is on whether the metadata versioning
> should
> > be
> > > per partition or global. For the partition level versioning, you were
> > > concerned about the performance. Given that metadata updates are rare,
> I
> > am
> > > not sure if it's a big concern though. Doing a million if tests is
> > probably
> > > going to take less than 1ms. Another thing is that the metadata version
> > > seems to need to survive controller failover. In your current
> approach, a
> > > consumer may not be able to wait on the right version of the 

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-11 Thread Colin McCabe
On Mon, Dec 11, 2017, at 13:17, Dong Lin wrote:
> On Thu, Dec 7, 2017 at 1:52 PM, Colin McCabe  wrote:
> 
> > On Wed, Dec 6, 2017, at 11:23, Becket Qin wrote:
> > > Hi Colin,
> > >
> > > >A full fetch request will certainly avoid any ambiguity here.  But now
> > > >we're back to sending full fetch requests whenever there are network
> > > >issues, which is worse than the current proposal.  And has the
> > > >congestion collapse problem I talked about earlier when the network is
> > > >wobbling.  We also don't get the other debuggability benefits of being
> > > >able to uniquely associate each update in the incremental fetch session
> > > >with a sequence number.
> > >
> > > I think we would want to optimize for the normal case instead of the
> > > failure case. The failure case is supposed to be rare and if that happens
> > > usually it requires human attention to fix anyways. So reducing the
> > > regular cost in the normal cases probably makes more sense.
> >
> 
> 
> Hmm.. let me chime in and ask a quick question on this.
> 
> My understanding of Becket's proposal is that the FetchRequest will not
> contain per-partition information in the normal cases. According to the
> latest KIP, it is said that "Incremental FetchRequests will only contain
> information about partitions which have changed on the follower". So if
> there is always data available for every partition on the broker, the
> FetchRequest will always contain per-partition information for every
> partition, which makes it essentially a full FetchRequest in normal case.
> Did I miss something here?

Hi Dong,

I think your understanding is correct.  The KIP-227 proposal includes
information about changed partitions in the partition fetch request.  If
every partition has changed, every partition will be included.

I don't think this is a problem.  For one thing, if every partition has
changed, then every partition will have data, which means you will have
a really large FetchResponse.  In that case, most of your network
bandwidth goes to the response anyway, rather than to the request.  And
you cannot get rid of that overhead, because you actually need to fetch
that data.

In any case, I am very skeptical that clusters that have information for
every partition on every fetch request exist in the wild. Remember that,
by default, we return a response to the fetch request when any partition
gets even a single byte of data.  Let's say you have 10,000 partitions. 
Do you have 10,000 produce requests being handled to each partition in
between each fetch request?  All the time?  So that somehow you can
service 10,000 Produce RPCs before you send back the response to a
single pending FetchRequest?  That's not very believable, especially
when you start thinking about internal topics.

I think the only way you could get reasonably close to a true fully
loaded fetch response is if you tuned Kafka for high latency and high
bandwidth.  So you could increase the wait time before sending back any
responses, and increase the minimum response size.  But that's not the
scenario we're addressing here.

best,
Colin

> 
> 
> 
> > >
> > > Thanks,
> >
> > Hi Becket,
> >
> > I agree we should optimize for the normal case.  I believe that the
> > sequence number proposal I put forward does this.  All the competing
> > proposals have been strictly worse for both the normal and error cases.
> > For example, the proposal to rely on the TCP session to establish
> > ordering does not help the normal case.  But it does make the case where
> > there are network issues worse.  It also makes it harder for us to put a
> > limit on the amount of time we will cache, which is worse for the normal
> > case.
> >
> > best,
> > Colin
> >
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Wed, Dec 6, 2017 at 10:58 AM, Colin McCabe 
> > wrote:
> > >
> > > > On Wed, Dec 6, 2017, at 10:49, Jason Gustafson wrote:
> > > > > >
> > > > > > There is already a way in the existing proposal for clients to
> > change
> > > > > > the set of partitions they are interested in, while re-using their
> > same
> > > > > > session and session ID.  We don't need to change how sequence ID
> > works
> > > > > > in order to do this.
> > > > >
> > > > >
> > > > > There is some inconsistency in the KIP about this, so I wasn't sure.
> > In
> > > > > particular, you say this: " The FetchSession maintains information
> > about
> > > > > a specific set of relevant partitions.  Note that the set of relevant
> > > > > partitions is established when the FetchSession is created.  It
> > cannot be
> > > > > changed later." Maybe that could be clarified?
> > > >
> > > > That's a fair point-- I didn't fix this part of the KIP after making an
> > > > update below.  So it was definitely unclear.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > >
> > > > >
> > > > > > But how does the broker know that it needs to resend the data for
> > > > > > partition P?  After all, if the response 

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-11 Thread Becket Qin
Hi Jun,

Yes, I agree avoiding reading the log segment is not the primary goal for
this KIP. I brought this up because recently I saw a significant throughput
impact when a broker is down for 20 - 30 min and rejoins a cluster. The
bytes in rate could drop by 50% when that broker is trying to catch up with
the leaders even in a big cluster (a single broker should not have such big
impact on the entire cluster). And some users also reported such cascading
degradation, i.e. when one consumer lags behind, the other consumers will
also start to lag behind. So I think addressing this is an important
improvement. I will run some test and see if returning at index boundary to
avoid the log scan would help address this issue. That being said, I agree
that we don't have to address this issue in this KIP. I can submit another
KIP later if avoiding the log segment scan helps.

Thanks,

Jiangjie (Becket) Qin

On Mon, Dec 11, 2017 at 1:06 PM, Dong Lin  wrote:

> Hey Colin,
>
> I went over the latest KIP wiki and have a few comments here.
>
> 1) The KIP says that client ID is a string if the session belongs to a
> Kafka consumer. And it is a numerical follower Id if the session belongs to
> a follower. Can we have a consistent type for the client Id?
>
> 2) "The numeric follower ID, if this fetch session belongs to a Kafka
> broker". If the broker has multiple replica fetcher thread, do they all
> have the same follower Id in teh leader broker?
>
> 3) One of the condition for evicting an existing session is that "The new
> session belongs to a follower, and the existing session belongs to a
> regular consumer". I am not sure the session from follower should also be
> restricted by the newly added config. It seems that we will always create
> lots for FetchRequest from follower brokers. Maybe the
> "max.incremental.fetch.session.cache.slots" should only be applies if the
> FetchRequest comes from a client consumer?
>
> 4) Not sure I fully understand how the "The last dirty sequence number" is
> used. It is mentioned that "Let P1 have a last dirty sequence number of
> 100, and P2 have a last dirty sequence number of 101. An incremental fetch
> request with sequence number 100 will return information about both P1 and
> P2." But would be the fetch offset for P2 in this case, if the last fetch
> offset stored in the Fetch Session for P2 is associated with the last dirty
> sequence number 101 for P2? My gut feel is that you would have to stored
> the fetch offset for sequence number 100 for P2 as well. Did I miss
> something here?
>
> Thanks,
> Dong
>
> On Sun, Dec 10, 2017 at 11:15 PM, Becket Qin  wrote:
>
> > Hi Jun,
> >
> > I see. Yes, that makes sense. Are we going to do that only for the
> fetches
> > whose per partition fetch size cannot reach the first index entry after
> the
> > fetch position, or are we going to do that for any fetch? If we do that
> for
> > any fetch, then we will still need to read the actual log segment, which
> > could be expensive if the data is no longer in the cache. This hurts
> > performance if some fetches are on the old log segments.
> >
> > I took a quick look on the clusters we have. The idle topic ratio varies
> > depending on the usage of the cluster. For our metric cluster and
> database
> > replication clusters almost all the topics are actively used. For
> tracking
> > clusters, ~70% topics have data coming in at different rate. For other
> > clusters such as queuing and data deployment. There are more idle topics
> > and the traffic is more bursty (I don't have the exact number here).
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Sun, Dec 10, 2017 at 10:17 PM, Colin McCabe 
> wrote:
> >
> > > On Fri, Dec 8, 2017, at 16:56, Jun Rao wrote:
> > > > Hi, Jiangjie,
> > > >
> > > > What I described is almost the same as yours. The only extra thing is
> > to
> > > > scan the log segment from the identified index entry a bit more to
> > find a
> > > > file position that ends at a message set boundary and is less than
> the
> > > > partition level fetch size. This way, we still preserve the current
> > > > semantic of not returning more bytes than fetch size unless there is
> a
> > > > single message set larger than the fetch size.
> > > >
> > > > In a typically cluster at LinkedIn, what's the percentage of idle
> > > > partitions?
> > >
> > > Yeah, that would be a great number to get.
> > >
> > > Of course, KIP-227 will also benefit partitions that are not completely
> > > idle.  For instance, a partition that's getting just one message a
> > > second will appear in many fetch requests, unless every other partition
> > > in the system is also only getting a low rate of incoming messages.
> > >
> > > regards,
> > > Colin
> > >
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Wed, Dec 6, 2017 at 6:57 PM, Becket Qin 
> > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > 

Build failed in Jenkins: kafka-0.11.0-jdk7 #345

2017-12-11 Thread Apache Jenkins Server
See 


Changes:

[jason] MINOR: Increase number of messages in replica verification tool test

--
[...truncated 2.61 MB...]

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testRestartConnectorLeaderRedirect STARTED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testRestartConnectorLeaderRedirect PASSED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testRestartConnectorOwnerRedirect STARTED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testRestartConnectorOwnerRedirect PASSED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testRestartTaskNotFound STARTED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testRestartTaskNotFound PASSED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testRestartTaskLeaderRedirect STARTED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testRestartTaskLeaderRedirect PASSED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testRestartTaskOwnerRedirect STARTED

org.apache.kafka.connect.runtime.rest.resources.ConnectorsResourceTest > 
testRestartTaskOwnerRedirect PASSED

org.apache.kafka.connect.runtime.rest.RestServerTest > testCORSEnabled STARTED

org.apache.kafka.connect.runtime.rest.RestServerTest > testCORSEnabled PASSED

org.apache.kafka.connect.runtime.rest.RestServerTest > testCORSDisabled STARTED

org.apache.kafka.connect.runtime.rest.RestServerTest > testCORSDisabled PASSED

org.apache.kafka.connect.runtime.rest.entities.ConnectorTypeTest > 
testToStringIsLowerCase STARTED

org.apache.kafka.connect.runtime.rest.entities.ConnectorTypeTest > 
testToStringIsLowerCase PASSED

org.apache.kafka.connect.runtime.rest.entities.ConnectorTypeTest > testForValue 
STARTED

org.apache.kafka.connect.runtime.rest.entities.ConnectorTypeTest > testForValue 
PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigCast STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigCast PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigRegexRouter STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigRegexRouter PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigSetSchemaMetadata STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigSetSchemaMetadata PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigTimestampConverter STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigTimestampConverter PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigHoistField STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigHoistField PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigMaskField STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigMaskField PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigInsertField STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigInsertField PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigFlatten STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigFlatten PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigReplaceField STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigReplaceField PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigTimestampRouter STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigTimestampRouter PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigValueToKey STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigValueToKey PASSED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigExtractField STARTED

org.apache.kafka.connect.runtime.TransformationConfigTest > 
testEmbeddedConfigExtractField PASSED

org.apache.kafka.connect.runtime.ConnectorConfigTest > unconfiguredTransform 
STARTED

org.apache.kafka.connect.runtime.ConnectorConfigTest > unconfiguredTransform 
PASSED

org.apache.kafka.connect.runtime.ConnectorConfigTest > 
multipleTransformsOneDangling STARTED

org.apache.kafka.connect.runtime.ConnectorConfigTest > 
multipleTransformsOneDangling PASSED

org.apache.kafka.connect.runtime.ConnectorConfigTest > misconfiguredTransform 
STARTED

org.apache.kafka.connect.runtime.ConnectorConfigTest > 

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-11 Thread Dong Lin
Hey Colin,

I went over the latest KIP wiki and have a few comments here.

1) The KIP says that client ID is a string if the session belongs to a
Kafka consumer. And it is a numerical follower Id if the session belongs to
a follower. Can we have a consistent type for the client Id?

2) "The numeric follower ID, if this fetch session belongs to a Kafka
broker". If the broker has multiple replica fetcher thread, do they all
have the same follower Id in teh leader broker?

3) One of the condition for evicting an existing session is that "The new
session belongs to a follower, and the existing session belongs to a
regular consumer". I am not sure the session from follower should also be
restricted by the newly added config. It seems that we will always create
lots for FetchRequest from follower brokers. Maybe the
"max.incremental.fetch.session.cache.slots" should only be applies if the
FetchRequest comes from a client consumer?

4) Not sure I fully understand how the "The last dirty sequence number" is
used. It is mentioned that "Let P1 have a last dirty sequence number of
100, and P2 have a last dirty sequence number of 101. An incremental fetch
request with sequence number 100 will return information about both P1 and
P2." But would be the fetch offset for P2 in this case, if the last fetch
offset stored in the Fetch Session for P2 is associated with the last dirty
sequence number 101 for P2? My gut feel is that you would have to stored
the fetch offset for sequence number 100 for P2 as well. Did I miss
something here?

Thanks,
Dong

On Sun, Dec 10, 2017 at 11:15 PM, Becket Qin  wrote:

> Hi Jun,
>
> I see. Yes, that makes sense. Are we going to do that only for the fetches
> whose per partition fetch size cannot reach the first index entry after the
> fetch position, or are we going to do that for any fetch? If we do that for
> any fetch, then we will still need to read the actual log segment, which
> could be expensive if the data is no longer in the cache. This hurts
> performance if some fetches are on the old log segments.
>
> I took a quick look on the clusters we have. The idle topic ratio varies
> depending on the usage of the cluster. For our metric cluster and database
> replication clusters almost all the topics are actively used. For tracking
> clusters, ~70% topics have data coming in at different rate. For other
> clusters such as queuing and data deployment. There are more idle topics
> and the traffic is more bursty (I don't have the exact number here).
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sun, Dec 10, 2017 at 10:17 PM, Colin McCabe  wrote:
>
> > On Fri, Dec 8, 2017, at 16:56, Jun Rao wrote:
> > > Hi, Jiangjie,
> > >
> > > What I described is almost the same as yours. The only extra thing is
> to
> > > scan the log segment from the identified index entry a bit more to
> find a
> > > file position that ends at a message set boundary and is less than the
> > > partition level fetch size. This way, we still preserve the current
> > > semantic of not returning more bytes than fetch size unless there is a
> > > single message set larger than the fetch size.
> > >
> > > In a typically cluster at LinkedIn, what's the percentage of idle
> > > partitions?
> >
> > Yeah, that would be a great number to get.
> >
> > Of course, KIP-227 will also benefit partitions that are not completely
> > idle.  For instance, a partition that's getting just one message a
> > second will appear in many fetch requests, unless every other partition
> > in the system is also only getting a low rate of incoming messages.
> >
> > regards,
> > Colin
> >
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Dec 6, 2017 at 6:57 PM, Becket Qin 
> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Yes, we still need to handle the corner case. And you are right, it
> is
> > all
> > > > about trade-off between simplicity and the performance gain.
> > > >
> > > > I was thinking that the brokers always return at least
> > > > log.index.interval.bytes per partition to the consumer, just like we
> > will
> > > > return at least one message to the user. This way we don't need to
> > worry
> > > > about the case that the fetch size is smaller than the index
> interval.
> > We
> > > > may just need to let users know this behavior change.
> > > >
> > > > Not sure if I completely understand your solution, but I think we are
> > > > thinking about the same. i.e. for the first fetch asking for offset
> > x0, we
> > > > will need to do a binary search to find the position p0. and then the
> > > > broker will iterate over the index entries starting from the first
> > index
> > > > entry whose offset is greater than p0 until it reaches the index
> > entry(x1,
> > > > p1) so that p1 - p0 is just under the fetch size, but the next entry
> > will
> > > > exceed the fetch size. We then return the bytes from p0 to p1.
> > Meanwhile
> > > > the broker caches the next fetch (x1, 

Re: [DISCUSS] KIP-234: add support for getting topic defaults from AdminClient

2017-12-11 Thread dan
Dong,

I agree that it *may* be better for a user to be explicit, however there
are a couple reasons they may not.
1) a user doesn't even know what the options are. imagine writing a tool
for users to create topics that steps them through things:

$ kafka-topics.sh --create
Give your topic a name: my-fav-topic
How many partitions do you want [12]:
What is the minimum in set replica size [2]:
What is the maximum message size [1MB]:
...

2) a user wants to use broker defaults within reason. say they thinke they
want min.cleanable.dirty.ratio=.5 and the default is .6. maybe thats fine,
or even better for them. since the person maintaining the actual cluster
has put thought in to this config. and as the maintainer keeps working on
making the cluster run better they can change and tune things on the
cluster level as needed.

dan


On Wed, Dec 6, 2017 at 11:51 AM, Dong Lin  wrote:

> Hey Dan,
>
> I think you are saying that, if user can read the default config before
> creating the topic, then this user can make better decision in what configs
> need to be overwritten. The question here is, how user is going to use this
> config to make the decision.
>
> In my understanding, user will compare the default value with expected
> value, and override the config to be expected value if they are different.
> If this is the only way that the default value can affect user's decision,
> then it seems OK for user to directly override the config to the expected
> value. I am wondering if this solution has some drawback.
>
> On the other hand, maybe there is a more advanced way that the default
> value can affect the user's decision. It may be useful to understand this
> use-case more specifically. Could you help provide a specific example here?
>
> Thanks,
> Dong
>
>
> On Wed, Dec 6, 2017 at 11:12 AM, dan  wrote:
>
> > Rajini,
> >
> > that was not my intent, the intent was to give a user of this api an
> > insight in to what their topic will look like once created. as things
> stand
> > now a user is unable to (easily) have any knowledge of what their topic
> > configs will be before doing a `CREATE_TOPICS`. as i mentioned in the
> KIP,
> > another option would be to have the `CreateTopicsOptions.
> > validateOnly=true`
> > version return data, but seems more invasive/questionable.
> >
> > dan
> >
> > On Wed, Dec 6, 2017 at 5:10 AM, Rajini Sivaram 
> > wrote:
> >
> > > Hi Dan,
> > >
> > > Thank you for the KIP. KIP-226 (https://cwiki.apache.org/
> > > confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration)
> > proposes
> > > to add an option to include all synonyms of a config option when
> > describing
> > > configs. This includes any defaults. For example (using Dong's
> example),
> > if
> > > you have topicA with min.cleanable.dirty.ratio=0.6 as an override and
> the
> > > broker default is 0.5, AdminClient#describeConfigs with synonyms would
> > > return the actual value in use as the config value (I,e.
> > > min.cleanable.dirty.ratio=0.6). And the synonyms list would contain
> (in
> > > the
> > > order of precedence in which these configs are applied):
> > >
> > >1. min.cleanable.dirty.ratio=0.6, config source=TOPIC_CONFIG
> > >2. log.min.cleanable.dirty.ratio=0.5, config
> > > source=STATIC_BROKER_CONFIG
> > >
> > >
> > > KIP-226 doesn't give you exactly what you are proposing in this KIP,
> but
> > it
> > > gives the mapping of configs. My concern with this KIP is that it
> assumes
> > > that if the broker config is static, i.e. if the current value of
> > > log.min.cleanable.dirty.ratio=0.6, you can safely create a topic with
> > > default min.cleanable.dirty.ratio relying on that the value to be
> applied
> > > all the time. This will not work with dynamic broker configs if the
> > broker
> > > defaults can be updated at any time.
> > >
> > >
> > > On Mon, Dec 4, 2017 at 11:22 PM, dan  wrote:
> > >
> > > > for point 1 i agree, its not too strong. only addition i could come
> up
> > > with
> > > > is that it allows any utility to have better forwards compatability.
> a
> > > cli
> > > > written that can inspect how a topic will be created would be able to
> > > give
> > > > insight/expectations about configs that didn't exist at compilation
> > time.
> > > >
> > > > for point 2, i am thinking about a situation where the user creating
> > > topics
> > > > and the user managing brokers are different people with different
> > > > skills/abilities.
> > > >
> > > > so in the given example lets assume a user creating the topic does
> not
> > > > *really* understand what that config means, so they are likely to
> just
> > > > leave it as default. and are happy for their admin to change these on
> > the
> > > > broker as needed.
> > > >
> > > > but say we have another user who is creating a topic who has much
> more
> > > > experience and has done testing, they will be able to determine what
> > the
> > > > value is 

Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field

2017-12-11 Thread Jun Rao
Hi, Dong,

Thanks for the reply.

My suggestion of forcing the metadata refresh from the controller may not
work in general since the cached controller could be outdated too. The
general problem is that if a consumer's metadata is outdated, it may get
stuck with the old leader for a long time. We can address the issue of
detecting outdated metadata in a separate KIP in the future if you didn't
intend to address it in this KIP.

Thanks,

Jun


On Sat, Dec 9, 2017 at 10:12 PM, Dong Lin  wrote:

> Hey Jun,
>
> Thanks much for your comments. Given that client needs to de-serialize the
> metadata anyway, the extra overhead of checking the per-partition version
> for every partition should not be a big concern. Thus it makes sense to use
> leader epoch as the per-partition version instead of creating a global
> metadata version. I will update the KIP to do that.
>
> Regarding the detection of outdated metadata, I think it is possible to
> ensure that client gets latest metadata by fetching from controller. Note
> that this requires extra logic in the controller such that controller
> updates metadata directly in memory without requiring
> UpdateMetadataRequest. But I am not sure the main motivation of this at
> this moment. But this makes controller more like a bottleneck in the
> cluster which we probably want to avoid.
>
> I think we can probably keep the current way of ensuring metadata
> freshness. Currently client will be forced to refresh metadata if broker
> returns error (e.g. NotLeaderForPartition) due to outdated metadata or if
> the metadata does not contain the partition that the client needs. In the
> future, as you previously suggested, we can include per-partition
> leaderEpoch in the FetchRequest/ProduceRequest such that broker can return
> error if the epoch is smaller than cached epoch in the broker. Given that
> this adds more complexity to Kafka, I think we can probably think about
> that leader when we have a specific use-case or problem to solve with
> up-to-date metadata. Does this sound OK?
>
> Thanks,
> Dong
>
>
>
> On Fri, Dec 8, 2017 at 3:53 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > Thanks for the reply. A few more points below.
> >
> > For dealing with how to prevent a consumer switching from a new leader to
> > an old leader, you suggestion that refreshes metadata on consumer restart
> > until it sees a metadata version >= the one associated with the offset
> > works too, as long as we guarantee that the cached metadata versions on
> the
> > brokers only go up.
> >
> > The second discussion point is on whether the metadata versioning should
> be
> > per partition or global. For the partition level versioning, you were
> > concerned about the performance. Given that metadata updates are rare, I
> am
> > not sure if it's a big concern though. Doing a million if tests is
> probably
> > going to take less than 1ms. Another thing is that the metadata version
> > seems to need to survive controller failover. In your current approach, a
> > consumer may not be able to wait on the right version of the metadata
> after
> > the consumer restart since the metadata version may have been recycled on
> > the server side due to a controller failover while the consumer is down.
> > The partition level leaderEpoch survives controller failure and won't
> have
> > this issue.
> >
> > Lastly, neither your proposal nor mine addresses the issue how to
> guarantee
> > a consumer to detect that is metadata is outdated. Currently, the
> consumer
> > is not guaranteed to fetch metadata from every broker within some bounded
> > period of time. Maybe this is out of the scope of your KIP. But one idea
> is
> > force the consumer to refresh metadata from the controller periodically.
> >
> > Jun
> >
> >
> > On Thu, Dec 7, 2017 at 11:25 AM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks much for the comments. Great point particularly regarding (3). I
> > > haven't thought about this before.
> > >
> > > It seems that there are two possible ways where the version number can
> be
> > > used. One solution is for client to check the version number at the
> time
> > it
> > > receives MetadataResponse. And if the version number in the
> > > MetadataResponse is smaller than the version number in the client's
> > cache,
> > > the client will be forced to fetch metadata again.  Another solution,
> as
> > > you have suggested, is for broker to check the version number at the
> time
> > > it receives a request from client. The broker will reject the request
> if
> > > the version is smaller than the version in broker's cache.
> > >
> > > I am not very sure that the second solution can address the problem
> here.
> > > In the scenario described in the JIRA ticket, broker's cache may be
> > > outdated because it has not processed the LeaderAndIsrRequest from the
> > > controller. Thus it may still process client's request even if the
> > version
> > > in 

Re: [VOTE] KIP-218: Make KafkaFuture.Function java 8 lambda compatible

2017-12-11 Thread Ewen Cheslack-Postava
+1 (binding)

-Ewen

On Mon, Dec 11, 2017 at 12:40 PM, Gwen Shapira  wrote:

> +1 (binding) - nice API improvement, thanks for driving it!
>
> On Mon, Dec 11, 2017 at 11:52 AM Xavier Léauté 
> wrote:
>
> > Thanks Steven, I believe I addressed all the comments. If the it looks
> good
> > to you let's move forward on the vote.
> >
> > On Sat, Dec 9, 2017 at 12:50 AM Steven Aerts 
> > wrote:
> >
> > > Hello Xavier,
> > >
> > > for me it is perfect to take it along.
> > > I made a few small remarks in your PR.
> > >
> > > Thanks
> > >
> > > Op za 9 dec. 2017 om 01:29 schreef Xavier Léauté  >:
> > >
> > > > Hi Steve, I just posted in the discussion thread, there's just one
> tiny
> > > fix
> > > > I think would be useful to add while we're making changes to this
> API.
> > > > Do you mind having a look?
> > > >
> > > > On Fri, Dec 8, 2017 at 11:37 AM Mickael Maison <
> > mickael.mai...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > +1 (non binding)
> > > > > Thanks for the KIP
> > > > >
> > > > > On Fri, Dec 8, 2017 at 6:53 PM, Tom Bentley  >
> > > > wrote:
> > > > > > +1
> > > > > >
> > > > > > On 8 December 2017 at 18:34, Ted Yu  wrote:
> > > > > >
> > > > > >> +1
> > > > > >>
> > > > > >> On Fri, Dec 8, 2017 at 3:49 AM, Steven Aerts <
> > > steven.ae...@gmail.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hello everybody,
> > > > > >> >
> > > > > >> >
> > > > > >> > I think KIP-218 is crystallized enough to start voting.
> > > > > >> >
> > > > > >> > KIP documentation:
> > > > > >> >
> > > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > >> > 218%3A+Make+KafkaFuture.Function+java+8+lambda+compatible
> > > > > >> >
> > > > > >> >
> > > > > >> > Thanks,
> > > > > >> >
> > > > > >> >
> > > > > >> >Steven
> > > > > >> >
> > > > > >>
> > > > >
> > > >
> > >
> >
>


[GitHub] kafka-site pull request #111: KAFKA-6334: fix typo in backwards compatibilit...

2017-12-11 Thread noslowerdna
GitHub user noslowerdna opened a pull request:

https://github.com/apache/kafka-site/pull/111

KAFKA-6334: fix typo in backwards compatibility note



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/noslowerdna/kafka-site KAFKA-6334

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka-site/pull/111.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #111


commit b5da3a1d4563ce0455b5382979c90659e3ce1150
Author: Andrew Olson 
Date:   2017-12-11T21:33:42Z

KAFKA-6334: fix typo in backwards compatibility note




---


Build failed in Jenkins: kafka-trunk-jdk8 #2267

2017-12-11 Thread Apache Jenkins Server
See 


Changes:

[jason] MINOR: Increase number of messages in replica verification tool test

--
[...truncated 3.39 MB...]

kafka.utils.CoreUtilsTest > testCsvList STARTED

kafka.utils.CoreUtilsTest > testCsvList PASSED

kafka.utils.CoreUtilsTest > testReadInt STARTED

kafka.utils.CoreUtilsTest > testReadInt PASSED

kafka.utils.CoreUtilsTest > testAtomicGetOrUpdate STARTED

kafka.utils.CoreUtilsTest > testAtomicGetOrUpdate PASSED

kafka.utils.CoreUtilsTest > testUrlSafeBase64EncodeUUID STARTED

kafka.utils.CoreUtilsTest > testUrlSafeBase64EncodeUUID PASSED

kafka.utils.CoreUtilsTest > testCsvMap STARTED

kafka.utils.CoreUtilsTest > testCsvMap PASSED

kafka.utils.CoreUtilsTest > testInLock STARTED

kafka.utils.CoreUtilsTest > testInLock PASSED

kafka.utils.CoreUtilsTest > testSwallow STARTED

kafka.utils.CoreUtilsTest > testSwallow PASSED

kafka.utils.IteratorTemplateTest > testIterator STARTED

kafka.utils.IteratorTemplateTest > testIterator PASSED

kafka.utils.json.JsonValueTest > testJsonObjectIterator STARTED

kafka.utils.json.JsonValueTest > testJsonObjectIterator PASSED

kafka.utils.json.JsonValueTest > testDecodeLong STARTED

kafka.utils.json.JsonValueTest > testDecodeLong PASSED

kafka.utils.json.JsonValueTest > testAsJsonObject STARTED

kafka.utils.json.JsonValueTest > testAsJsonObject PASSED

kafka.utils.json.JsonValueTest > testDecodeDouble STARTED

kafka.utils.json.JsonValueTest > testDecodeDouble PASSED

kafka.utils.json.JsonValueTest > testDecodeOption STARTED

kafka.utils.json.JsonValueTest > testDecodeOption PASSED

kafka.utils.json.JsonValueTest > testDecodeString STARTED

kafka.utils.json.JsonValueTest > testDecodeString PASSED

kafka.utils.json.JsonValueTest > testJsonValueToString STARTED

kafka.utils.json.JsonValueTest > testJsonValueToString PASSED

kafka.utils.json.JsonValueTest > testAsJsonObjectOption STARTED

kafka.utils.json.JsonValueTest > testAsJsonObjectOption PASSED

kafka.utils.json.JsonValueTest > testAsJsonArrayOption STARTED

kafka.utils.json.JsonValueTest > testAsJsonArrayOption PASSED

kafka.utils.json.JsonValueTest > testAsJsonArray STARTED

kafka.utils.json.JsonValueTest > testAsJsonArray PASSED

kafka.utils.json.JsonValueTest > testJsonValueHashCode STARTED

kafka.utils.json.JsonValueTest > testJsonValueHashCode PASSED

kafka.utils.json.JsonValueTest > testDecodeInt STARTED

kafka.utils.json.JsonValueTest > testDecodeInt PASSED

kafka.utils.json.JsonValueTest > testDecodeMap STARTED

kafka.utils.json.JsonValueTest > testDecodeMap PASSED

kafka.utils.json.JsonValueTest > testDecodeSeq STARTED

kafka.utils.json.JsonValueTest > testDecodeSeq PASSED

kafka.utils.json.JsonValueTest > testJsonObjectGet STARTED

kafka.utils.json.JsonValueTest > testJsonObjectGet PASSED

kafka.utils.json.JsonValueTest > testJsonValueEquals STARTED

kafka.utils.json.JsonValueTest > testJsonValueEquals PASSED

kafka.utils.json.JsonValueTest > testJsonArrayIterator STARTED

kafka.utils.json.JsonValueTest > testJsonArrayIterator PASSED

kafka.utils.json.JsonValueTest > testJsonObjectApply STARTED

kafka.utils.json.JsonValueTest > testJsonObjectApply PASSED

kafka.utils.json.JsonValueTest > testDecodeBoolean STARTED

kafka.utils.json.JsonValueTest > testDecodeBoolean PASSED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic STARTED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic PASSED

kafka.producer.AsyncProducerTest > testQueueTimeExpired STARTED

kafka.producer.AsyncProducerTest > testQueueTimeExpired PASSED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents STARTED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents PASSED

kafka.producer.AsyncProducerTest > testBatchSize STARTED

kafka.producer.AsyncProducerTest > testBatchSize PASSED

kafka.producer.AsyncProducerTest > testSerializeEvents STARTED

kafka.producer.AsyncProducerTest > testSerializeEvents PASSED

kafka.producer.AsyncProducerTest > testProducerQueueSize STARTED

kafka.producer.AsyncProducerTest > testProducerQueueSize PASSED

kafka.producer.AsyncProducerTest > testRandomPartitioner STARTED

kafka.producer.AsyncProducerTest > testRandomPartitioner PASSED

kafka.producer.AsyncProducerTest > testInvalidConfiguration STARTED

kafka.producer.AsyncProducerTest > testInvalidConfiguration PASSED

kafka.producer.AsyncProducerTest > testInvalidPartition STARTED

kafka.producer.AsyncProducerTest > testInvalidPartition PASSED

kafka.producer.AsyncProducerTest > testNoBroker STARTED

kafka.producer.AsyncProducerTest > testNoBroker PASSED

kafka.producer.AsyncProducerTest > testProduceAfterClosed STARTED

kafka.producer.AsyncProducerTest > testProduceAfterClosed PASSED

kafka.producer.AsyncProducerTest > testJavaProducer STARTED

kafka.producer.AsyncProducerTest > testJavaProducer PASSED

kafka.producer.AsyncProducerTest > testIncompatibleEncoder STARTED


Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment

2017-12-11 Thread Jun Rao
Another question is on the compatibility. Since now there are 2 ways of
specifying a partition reassignment, one under /admin/reassign_partitions
and the other under /admin/reassignments, we probably want to prevent the
same topic being reassigned under both paths at the same time?
Thanks,

Jun



On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao  wrote:

> Hi, Tom,
>
> Thanks for the KIP. It definitely addresses one of the pain points in
> partition reassignment. Another issue that it also addresses is the ZK node
> size limit when writing the reassignment JSON.
>
> My only concern is that the KIP needs to create one watcher per reassigned
> partition. This could add overhead in ZK and complexity for debugging when
> lots of partitions are being reassigned simultaneously. We could
> potentially improve this by introducing a separate ZK path for change
> notification as we do for configs. For example, every time we change the
> assignment for a set of partitions, we could further write a sequential
> node /admin/reassignment_changes/[change_x]. That way, the controller
> only needs to watch the change path. Once a change is triggered, the
> controller can read everything under /admin/reassignments/.
>
> Jun
>
>
> On Wed, Dec 6, 2017 at 1:19 PM, Tom Bentley  wrote:
>
>> Hi,
>>
>> This is still very new, but I wanted some quick feedback on a preliminary
>> KIP which could, I think, help with providing an AdminClient API for
>> partition reassignment.
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%
>> 3A+Interruptible+Partition+Reassignment
>>
>> I wasn't sure whether to start fleshing out a whole AdminClient API in
>> this
>> KIP (which would make it very big, and difficult to read), or whether to
>> break it down into smaller KIPs (which makes it easier to read and
>> implement in pieces, but harder to get a high-level picture of the
>> ultimate
>> destination). For now I've gone for a very small initial KIP, but I'm
>> happy
>> to sketch the bigger picture here if people are interested.
>>
>> Cheers,
>>
>> Tom
>>
>
>


Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-11 Thread Jun Rao
Hi, Jiangjie,

Thanks for the info. I was thinking of doing the scan of the log segment on
every fetch request as we do today. The optimization for this KIP is
probably mostly useful for real time consumption, in which case the log
segments that need to be accessed are likely still in pagecache.

Jun

On Sun, Dec 10, 2017 at 11:15 PM, Becket Qin  wrote:

> Hi Jun,
>
> I see. Yes, that makes sense. Are we going to do that only for the fetches
> whose per partition fetch size cannot reach the first index entry after the
> fetch position, or are we going to do that for any fetch? If we do that for
> any fetch, then we will still need to read the actual log segment, which
> could be expensive if the data is no longer in the cache. This hurts
> performance if some fetches are on the old log segments.
>
> I took a quick look on the clusters we have. The idle topic ratio varies
> depending on the usage of the cluster. For our metric cluster and database
> replication clusters almost all the topics are actively used. For tracking
> clusters, ~70% topics have data coming in at different rate. For other
> clusters such as queuing and data deployment. There are more idle topics
> and the traffic is more bursty (I don't have the exact number here).
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sun, Dec 10, 2017 at 10:17 PM, Colin McCabe  wrote:
>
> > On Fri, Dec 8, 2017, at 16:56, Jun Rao wrote:
> > > Hi, Jiangjie,
> > >
> > > What I described is almost the same as yours. The only extra thing is
> to
> > > scan the log segment from the identified index entry a bit more to
> find a
> > > file position that ends at a message set boundary and is less than the
> > > partition level fetch size. This way, we still preserve the current
> > > semantic of not returning more bytes than fetch size unless there is a
> > > single message set larger than the fetch size.
> > >
> > > In a typically cluster at LinkedIn, what's the percentage of idle
> > > partitions?
> >
> > Yeah, that would be a great number to get.
> >
> > Of course, KIP-227 will also benefit partitions that are not completely
> > idle.  For instance, a partition that's getting just one message a
> > second will appear in many fetch requests, unless every other partition
> > in the system is also only getting a low rate of incoming messages.
> >
> > regards,
> > Colin
> >
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Dec 6, 2017 at 6:57 PM, Becket Qin 
> wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Yes, we still need to handle the corner case. And you are right, it
> is
> > all
> > > > about trade-off between simplicity and the performance gain.
> > > >
> > > > I was thinking that the brokers always return at least
> > > > log.index.interval.bytes per partition to the consumer, just like we
> > will
> > > > return at least one message to the user. This way we don't need to
> > worry
> > > > about the case that the fetch size is smaller than the index
> interval.
> > We
> > > > may just need to let users know this behavior change.
> > > >
> > > > Not sure if I completely understand your solution, but I think we are
> > > > thinking about the same. i.e. for the first fetch asking for offset
> > x0, we
> > > > will need to do a binary search to find the position p0. and then the
> > > > broker will iterate over the index entries starting from the first
> > index
> > > > entry whose offset is greater than p0 until it reaches the index
> > entry(x1,
> > > > p1) so that p1 - p0 is just under the fetch size, but the next entry
> > will
> > > > exceed the fetch size. We then return the bytes from p0 to p1.
> > Meanwhile
> > > > the broker caches the next fetch (x1, p1). So when the next fetch
> > comes, it
> > > > will just iterate over the offset index entry starting at (x1, p1).
> > > >
> > > > It is true that in the above approach, the log compacted topic needs
> > to be
> > > > handled. It seems that this can be solved by checking whether the
> > cached
> > > > index and the new log index are still the same index object. If they
> > are
> > > > not the same, we can fall back to binary search with the cached
> > offset. It
> > > > is admittedly more complicated than the current logic, but given the
> > binary
> > > > search logic already exists, it seems the additional object sanity
> > check is
> > > > not too much work.
> > > >
> > > > Not sure if the above implementation is simple enough to justify the
> > > > performance improvement. Let me know if you see potential complexity.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Dec 6, 2017 at 4:48 PM, Jun Rao  wrote:
> > > >
> > > > > Hi, Becket,
> > > > >
> > > > > Yes, I agree that it's rare to have the fetch size smaller than
> index
> > > > > interval. It's just that we still need additional code to handle
> the
> > rare
> > > > > case.
> > > > >
> > 

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-11 Thread Dong Lin
On Thu, Dec 7, 2017 at 1:52 PM, Colin McCabe  wrote:

> On Wed, Dec 6, 2017, at 11:23, Becket Qin wrote:
> > Hi Colin,
> >
> > >A full fetch request will certainly avoid any ambiguity here.  But now
> > >we're back to sending full fetch requests whenever there are network
> > >issues, which is worse than the current proposal.  And has the
> > >congestion collapse problem I talked about earlier when the network is
> > >wobbling.  We also don't get the other debuggability benefits of being
> > >able to uniquely associate each update in the incremental fetch session
> > >with a sequence number.
> >
> > I think we would want to optimize for the normal case instead of the
> > failure case. The failure case is supposed to be rare and if that happens
> > usually it requires human attention to fix anyways. So reducing the
> > regular cost in the normal cases probably makes more sense.
>


Hmm.. let me chime in and ask a quick question on this.

My understanding of Becket's proposal is that the FetchRequest will not
contain per-partition information in the normal cases. According to the
latest KIP, it is said that "Incremental FetchRequests will only contain
information about partitions which have changed on the follower". So if
there is always data available for every partition on the broker, the
FetchRequest will always contain per-partition information for every
partition, which makes it essentially a full FetchRequest in normal case.
Did I miss something here?



> >
> > Thanks,
>
> Hi Becket,
>
> I agree we should optimize for the normal case.  I believe that the
> sequence number proposal I put forward does this.  All the competing
> proposals have been strictly worse for both the normal and error cases.
> For example, the proposal to rely on the TCP session to establish
> ordering does not help the normal case.  But it does make the case where
> there are network issues worse.  It also makes it harder for us to put a
> limit on the amount of time we will cache, which is worse for the normal
> case.
>
> best,
> Colin
>
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Dec 6, 2017 at 10:58 AM, Colin McCabe 
> wrote:
> >
> > > On Wed, Dec 6, 2017, at 10:49, Jason Gustafson wrote:
> > > > >
> > > > > There is already a way in the existing proposal for clients to
> change
> > > > > the set of partitions they are interested in, while re-using their
> same
> > > > > session and session ID.  We don't need to change how sequence ID
> works
> > > > > in order to do this.
> > > >
> > > >
> > > > There is some inconsistency in the KIP about this, so I wasn't sure.
> In
> > > > particular, you say this: " The FetchSession maintains information
> about
> > > > a specific set of relevant partitions.  Note that the set of relevant
> > > > partitions is established when the FetchSession is created.  It
> cannot be
> > > > changed later." Maybe that could be clarified?
> > >
> > > That's a fair point-- I didn't fix this part of the KIP after making an
> > > update below.  So it was definitely unclear.
> > >
> > > best,
> > > Colin
> > >
> > > >
> > > >
> > > > > But how does the broker know that it needs to resend the data for
> > > > > partition P?  After all, if the response had not been dropped, P
> would
> > > > > not have been resent, since it didn't change.  Under the existing
> > > > > scheme, the follower can look at lastDirtyEpoch to find this out.
>  In
> > > > > the new scheme, I don't see how it would know.
> > > >
> > > >
> > > > If a fetch response is lost, the epoch would be bumped by the client
> and
> > > > a
> > > > full fetch would be sent. Doesn't that solve the issue?
> > > >
> > > > -Jason
> > > >
> > > > On Wed, Dec 6, 2017 at 10:40 AM, Colin McCabe 
> > > wrote:
> > > >
> > > > > On Wed, Dec 6, 2017, at 09:32, Jason Gustafson wrote:
> > > > > > >
> > > > > > > Thinking about this again. I do see the reason that we want to
> > > have a
> > > > > epoch
> > > > > > > to avoid out of order registration of the interested set. But
> I am
> > > > > > > wondering if the following semantic would meet what we want
> better:
> > > > > > >  - Session Id: the id assigned to a single client for life long
> > > time.
> > > > > i.e
> > > > > > > it does not change when the interested partitions change.
> > > > > > >  - Epoch: the interested set epoch. Only updated when a full
> fetch
> > > > > request
> > > > > > > comes, which may result in the interested partition set change.
> > > > > > > This will ensure that the registered interested set will
> always be
> > > the
> > > > > > > latest registration. And the clients can change the interested
> > > > > partition
> > > > > > > set without creating another session.
> > > > > >
> > > > > >
> > > > > > I agree this is a bit more intuitive than the sequence number
> and the
> > > > > > ability to reuse the session is beneficial since it causes less
> > > waste of
> > > > > > the cache for session timeouts.
> > > > 

Re: [VOTE] KIP-218: Make KafkaFuture.Function java 8 lambda compatible

2017-12-11 Thread Gwen Shapira
+1 (binding) - nice API improvement, thanks for driving it!

On Mon, Dec 11, 2017 at 11:52 AM Xavier Léauté  wrote:

> Thanks Steven, I believe I addressed all the comments. If the it looks good
> to you let's move forward on the vote.
>
> On Sat, Dec 9, 2017 at 12:50 AM Steven Aerts 
> wrote:
>
> > Hello Xavier,
> >
> > for me it is perfect to take it along.
> > I made a few small remarks in your PR.
> >
> > Thanks
> >
> > Op za 9 dec. 2017 om 01:29 schreef Xavier Léauté :
> >
> > > Hi Steve, I just posted in the discussion thread, there's just one tiny
> > fix
> > > I think would be useful to add while we're making changes to this API.
> > > Do you mind having a look?
> > >
> > > On Fri, Dec 8, 2017 at 11:37 AM Mickael Maison <
> mickael.mai...@gmail.com
> > >
> > > wrote:
> > >
> > > > +1 (non binding)
> > > > Thanks for the KIP
> > > >
> > > > On Fri, Dec 8, 2017 at 6:53 PM, Tom Bentley 
> > > wrote:
> > > > > +1
> > > > >
> > > > > On 8 December 2017 at 18:34, Ted Yu  wrote:
> > > > >
> > > > >> +1
> > > > >>
> > > > >> On Fri, Dec 8, 2017 at 3:49 AM, Steven Aerts <
> > steven.ae...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >> > Hello everybody,
> > > > >> >
> > > > >> >
> > > > >> > I think KIP-218 is crystallized enough to start voting.
> > > > >> >
> > > > >> > KIP documentation:
> > > > >> >
> > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > >> > 218%3A+Make+KafkaFuture.Function+java+8+lambda+compatible
> > > > >> >
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> >
> > > > >> >Steven
> > > > >> >
> > > > >>
> > > >
> > >
> >
>


[jira] [Resolved] (KAFKA-1530) howto update continuously

2017-12-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-1530.
--
Resolution: Fixed

> howto update continuously
> -
>
> Key: KAFKA-1530
> URL: https://issues.apache.org/jira/browse/KAFKA-1530
> Project: Kafka
>  Issue Type: Wish
>Reporter: Stanislav Gilmulin
>Assignee: Guozhang Wang
>Priority: Minor
>  Labels: operating_manual, performance
>
> Hi,
>  
> Could I ask you a question about the Kafka update procedure?
> Is there a way to update software, which doesn't require service interruption 
> or lead to data losses?
> We can't stop message brokering during the update as we have a strict SLA.
>  
> Best regards
> Stanislav Gilmulin



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-228 Negative record timestamp support

2017-12-11 Thread Konstantin Chukhlomin
Hi all, 

I've updated KIP with few more details:
Added (proposed) Changes in binary message format 

Added Changes from producer perspective 

Added Changes from consumer perspective 


Let me know if it makes sense to you.

-Konstantin

> On Dec 7, 2017, at 2:46 PM, Konstantin Chukhlomin  wrote:
> 
> Hi Matthias,
> 
> Indeed for consumers it will be not obvious what −1 means: actual timestamp
> or no timestamp. Nevertheless, it's just −1 millisecond, so I thought it will 
> be 
> not a big deal to leave it (not clean, but acceptable).
> 
> I agree that it will much cleaner to have a different type of topics that 
> support
> negative timestamp and/or threat Long.MIN_VALUE as a no-timestamp.
> I'll update KIP to make it a proposed solution.
> 
> Thanks,
> Konstantin
> 
>> On Dec 5, 2017, at 7:06 PM, Matthias J. Sax  wrote:
>> 
>> Thanks for the KIP Konstantin.
>> 
>> From my understanding, you propose to just remove the negative timestamp
>> check in KafkaProducer and KafkaStreams. If topics are configured with
>> `CreateTime` brokers also write negative timestamps if they are embedded
>> in the message.
>> 
>> However, I am not sure about the overlapping semantics for -1 timestamp.
>> My concerns is, that this ambiguity might result in issues. Assume that
>> there is a topic (configured with `CreateTime`) for which an old and a
>> new producer are writing. The old producer uses old message format and
>> does not include any timestamp in the message. The broker will "upgrade"
>> this message to the new format and set -1. At the same time, the new
>> producer could write a message with valid timestamp -1. A consumer could
>> not distinguish between both cases...
>> 
>> Also, there might be other Producer implementations that write negative
>> timestamps. Thus, those might already exist. For Streams, we don't
>> process those and we should make sure to keep it this way (to avoid
>> ambiguity).
>> 
>> Thus, it might actually make sense to introduce a new timestamp type to
>> express those new semantics. The question is still, how to deal with
>> older producer clients that want to write to those topics.
>> 
>> - We could either use `Long.MIN_VALUE` as "unknown" (this would be way
>> better than -1 as it's not in the middle of the range but at the very
>> end and it will also have well-defined semantics).
>> - Or we use a "mixed-mode" where we use broker wall-clock time for
>> older message formats (ie, append time semantics for older producers)
>> - Third, we would even give an error message back to older producers;
>> this might change the backward compatibility guarantees Kafka provides
>> so far when upgrading brokers. However, this would not affect exiting
>> topics, but only newly created ones (and we could disallow changing the
>> semantics to the new timestamp type to guard against miss
>> configuration). Thus, it might be ok.
>> 
>> For Streams, we could check the topic config and process negative
>> timestamps only if the topic is configures with the new timestamp type.
>> 
>> 
>> Maybe I am a little bit to paranoid about overloading -1 semantics.
>> Curious to get feedback from others.
>> 
>> 
>> 
>> -Matthias
>> 
>> 
>> On 12/5/17 1:24 PM, Konstantin Chukhlomin wrote:
>>> Hi Dong,
>>> 
>>> Currently we are storing historical timestamp in the message.
>>> 
>>> What we are trying to achieve is to make it possible to do Kafka lookup 
>>> by timestamp. Ideally I would do `offsetsForTimes` to find articles 
>>> published 
>>> in 1910s (if we are storing articles on the log).
>>> 
>>> So first two suggestions aren't really covering our use-case.
>>> 
>>> We could create a new timestamp type like "HistoricalTimestamp" or 
>>> "MaybeNegativeTimestamp".
>>> And the only difference between this one and CreateTime is that it could be 
>>> negative.
>>> I tend to use CreateTime for this purpose because it's easier to understand 
>>> from 
>>> user perspective as a timestamp which publisher can set.
>>> 
>>> Thanks,
>>> Konstantin
>>> 
 On Dec 5, 2017, at 3:47 PM, Dong Lin  wrote:
 
 Hey Konstantin,
 
 Thanks for the KIP. I have a few questions below.
 
 Strictly speaking Kafka actually allows you to store historical data. And
 user are free to encode arbitrary timestamp field in their Kafka message.
 For example, your Kafka message can currently have Json or Avro format and
 you can put a timestamp field there. Do you think that 

Re: [VOTE] KIP-218: Make KafkaFuture.Function java 8 lambda compatible

2017-12-11 Thread Xavier Léauté
Thanks Steven, I believe I addressed all the comments. If the it looks good
to you let's move forward on the vote.

On Sat, Dec 9, 2017 at 12:50 AM Steven Aerts  wrote:

> Hello Xavier,
>
> for me it is perfect to take it along.
> I made a few small remarks in your PR.
>
> Thanks
>
> Op za 9 dec. 2017 om 01:29 schreef Xavier Léauté :
>
> > Hi Steve, I just posted in the discussion thread, there's just one tiny
> fix
> > I think would be useful to add while we're making changes to this API.
> > Do you mind having a look?
> >
> > On Fri, Dec 8, 2017 at 11:37 AM Mickael Maison  >
> > wrote:
> >
> > > +1 (non binding)
> > > Thanks for the KIP
> > >
> > > On Fri, Dec 8, 2017 at 6:53 PM, Tom Bentley 
> > wrote:
> > > > +1
> > > >
> > > > On 8 December 2017 at 18:34, Ted Yu  wrote:
> > > >
> > > >> +1
> > > >>
> > > >> On Fri, Dec 8, 2017 at 3:49 AM, Steven Aerts <
> steven.ae...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Hello everybody,
> > > >> >
> > > >> >
> > > >> > I think KIP-218 is crystallized enough to start voting.
> > > >> >
> > > >> > KIP documentation:
> > > >> >
> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > >> > 218%3A+Make+KafkaFuture.Function+java+8+lambda+compatible
> > > >> >
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> >
> > > >> >Steven
> > > >> >
> > > >>
> > >
> >
>


[jira] [Created] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read

2017-12-11 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-6345:
---

 Summary: NetworkClient.inFlightRequestCount() is not thread safe, 
causing ConcurrentModificationExceptions when sensors are read
 Key: KAFKA-6345
 URL: https://issues.apache.org/jira/browse/KAFKA-6345
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.0.0
Reporter: radai rosenblatt


example stack trace (code is ~0.10.2.*)
{code}
java.util.ConcurrentModificationException: 
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
at 
org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109)
at 
org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382)
at 
org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
at 
org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183)
at 
org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
{code}

looking at latest trunk, the code is still vulnerable:
# NetworkClient.inFlightRequestCount() eventually iterates over 
InFlightRequests.requests.values(), which is backed by a (non-thread-safe) 
HashMap
# this will be called from the "requests-in-flight" sensor's measure() method 
(Sender.java line  ~765 in SenderMetrics ctr), which would be driven by some 
thread reading JMX values
# HashMap in question would also be updated by some client io thread calling 
NetworkClient.doSend() - which calls into InFlightRequests.add())

i guess the only upside is that this exception will always happen on the thread 
reading the JMX values and never on the actual client io thread ...



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-1561) Data Loss for Incremented Replica Factor and Leader Election

2017-12-11 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-1561.
--
Resolution: Fixed

> Data Loss for Incremented Replica Factor and Leader Election
> 
>
> Key: KAFKA-1561
> URL: https://issues.apache.org/jira/browse/KAFKA-1561
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.11.0.0
>
> Attachments: broker0.log, broker2.log, consumer.log, producer.log
>
>
> This is reported on the mailing list (thanks to Jad).
> {quote}
> Hi,
> I have a test that continuously sends messages to one broker, brings up
> another broker, and adds it as a replica for all partitions, with it being
> the preferred replica for some. I have auto.leader.rebalance.enable=true,
> so replica election gets triggered. Data is being pumped to the old broker
> all the while. It seems that some data gets lost while switching over to
> the new leader. Is this a bug, or do I have something misconfigured? I also
> have request.required.acks=-1 on the producer.
> Here's what I think is happening:
> 1. Producer writes message to broker 0, [EventServiceUpsertTopic,13], w/
> broker 0 currently leader, with ISR=(0), so write returns successfully,
> even when acks = -1. Correlation id 35836
> Producer log:
> [2014-07-24 14:44:26,991]  [DEBUG]  [dw-97 - PATCH
> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
> [kafka.producer.BrokerPartitionInfo]  Partition
> [EventServiceUpsertTopic,13] has leader 0
> [2014-07-24 14:44:26,993]  [DEBUG]  [dw-97 - PATCH
> /v1/events/type_for_test_bringupNewBroker_shouldRebalance_shouldNotLoseData/event?_idPath=idField&_mergeFields=field1]
> [k.producer.async.DefaultEventHandler]  Producer sent messages with
> correlation id 35836 for topics [EventServiceUpsertTopic,13] to broker 0 on
> localhost:56821
> 2. Broker 1 is still catching up
> Broker 0 Log:
> [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-request-handler-3]
> [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,13] on broker
> 0: Old hw for partition [EventServiceUpsertTopic,13] is 971. New hw is 971.
> All leo's are 975,971
> [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-request-handler-3]
> [kafka.server.KafkaApis]  [KafkaApi-0] Produce to local log in 0 ms
> [2014-07-24 14:44:26,992]  [DEBUG]  [kafka-processor-56821-0]
> [kafka.request.logger]  Completed request:Name: ProducerRequest; Version:
> 0; CorrelationId: 35836; ClientId: ; RequiredAcks: -1; AckTimeoutMs: 1
> ms from client /127.0.0.1:57086
> ;totalTime:0,requestQueueTime:0,localTime:0,remoteTime:0,responseQueueTime:0,sendTime:0
> 3. Leader election is triggered by the scheduler:
> Broker 0 Log:
> [2014-07-24 14:44:26,991]  [INFO ]  [kafka-scheduler-0]
> [k.c.PreferredReplicaPartitionLeaderSelector]
> [PreferredReplicaPartitionLeaderSelector]: Current leader 0 for partition [
> EventServiceUpsertTopic,13] is not the preferred replica. Trigerring
> preferred replica leader election
> [2014-07-24 14:44:26,993]  [DEBUG]  [kafka-scheduler-0]
> [kafka.utils.ZkUtils$]  Conditional update of path
> /brokers/topics/EventServiceUpsertTopic/partitions/13/state with value
> {"controller_epoch":1,"leader":1,"version":1,"leader_epoch":3,"isr":[0,1]}
> and expected version 3 succeeded, returning the new version: 4
> [2014-07-24 14:44:26,994]  [DEBUG]  [kafka-scheduler-0]
> [k.controller.PartitionStateMachine]  [Partition state machine on
> Controller 0]: After leader election, leader cache is updated to
> Map((Leader:1,ISR:0,1,LeaderEpoch:3,ControllerEpoch:1),)
> [2014-07-24 14:44:26,994]  [INFO ]  [kafka-scheduler-0]
> [kafka.controller.KafkaController]  [Controller 0]: Partition [
> EventServiceUpsertTopic,13] completed preferred replica leader election.
> New leader is 1
> 4. Broker 1 is still behind, but it sets the high water mark to 971!!!
> Broker 1 Log:
> [2014-07-24 14:44:26,999]  [INFO ]  [kafka-request-handler-6]
> [kafka.server.ReplicaFetcherManager]  [ReplicaFetcherManager on broker 1]
> Removed fetcher for partitions [EventServiceUpsertTopic,13]
> [2014-07-24 14:44:27,000]  [DEBUG]  [kafka-request-handler-6]
> [kafka.cluster.Partition]  Partition [EventServiceUpsertTopic,13] on broker
> 1: Old hw for partition [EventServiceUpsertTopic,13] is 970. New hw is -1.
> All leo's are -1,971
> [2014-07-24 14:44:27,098]  [DEBUG]  [kafka-request-handler-3]
> [kafka.server.KafkaApis]  [KafkaApi-1] Maybe update partition HW due to
> fetch request: Name: FetchRequest; Version: 0; CorrelationId: 1; ClientId:
> ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [EventServiceUpsertTopic,13] ->
> PartitionFetchInfo(971,1048576), 
> [2014-07-24 14:44:27,098]  

[GitHub] kafka pull request #4312: MINOR: Increase number of messages in replica veri...

2017-12-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4312


---


[GitHub] kafka pull request #4312: MINOR: Increase number of messages in replica veri...

2017-12-11 Thread rajinisivaram
GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/4312

MINOR: Increase number of messages in replica verification tool test

Increase the number of messages produced to make the test more reliable. 
The test failed in a recent build and also fails intermittently when run 
locally. Since the producer uses acks=0 and the test stops as soon as a lag is 
observed, the change shouldn't have a big impact on the time taken to run when 
lag is observed sooner.

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka 
MINOR-replicaverification-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4312.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4312


commit 94ef402fceec0df1a10ddc8a0f435c8c68a8e6c3
Author: Rajini Sivaram 
Date:   2017-12-11T16:58:30Z

MINOR: Increase number of messages in replica verification tool test




---


Jenkins build is back to normal : kafka-trunk-jdk8 #2266

2017-12-11 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk9 #248

2017-12-11 Thread Apache Jenkins Server
See 




[GitHub] kafka pull request #2944: KAFKA-3940: Replaced File.mkdir/mkdirs/delete by t...

2017-12-11 Thread mimaison
Github user mimaison closed the pull request at:

https://github.com/apache/kafka/pull/2944


---


[GitHub] kafka pull request #4311: KAFKA-6298 - Line numbers on log messages are inco...

2017-12-11 Thread mrnakumar
GitHub user mrnakumar opened a pull request:

https://github.com/apache/kafka/pull/4311

KAFKA-6298 - Line numbers on log messages are incorrect

Modified LogContext.KafkaLogger to add support for location aware logging. 
If LocationAwareLogger is not available then fallback to the Logger. 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mrnakumar/kafka 
KAFKA-6298-LOCATION-AWARE-LOGGING

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4311.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4311


commit a8900b147e01384eaab84974f4fc11b530ca5490
Author: Narendra kumar 
Date:   2017-12-11T13:43:25Z

Added location awareness to LogContext.KafkaLogger




---


[jira] [Created] (KAFKA-6344) 0.8.2 clients will store invalid configuration in ZK for Kafka 1.0 brokers

2017-12-11 Thread Vincent Bernat (JIRA)
Vincent Bernat created KAFKA-6344:
-

 Summary: 0.8.2 clients will store invalid configuration in ZK for 
Kafka 1.0 brokers
 Key: KAFKA-6344
 URL: https://issues.apache.org/jira/browse/KAFKA-6344
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Vincent Bernat


Hello,

When using a Kafka 0.8.2 Scala client, the "changeTopicConfig" method from 
AdminUtils will write the topic name to /config/changes/config_change_X. 
Since 0.9, it is expected to have a JSON string and brokers will bail out if it 
is not the case with a java.lang.IllegalArgumentException with message "Config 
change notification has an unexpected value. The format is:{\"version\" : 1, 
\"entity_type\":\"topics/clients\", \"entity_name\" : \"topic_name/client_id\"} 
or {\"version\" : 2, \"entity_path\":\"entity_type/entity_name\"}. Received: 
\"dns\"". Moreover, the broker will shutdown after this error.

As 1.0 brokers are expected to accept 0.8.x clients, either highlight in the 
documentation this doesn't apply to AdminUtils or accept this "version 0" 
format.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #4215: KAFKA-6121: Restore and global consumer should not...

2017-12-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4215


---


Re: [VOTE] KIP-86: Configurable SASL callback handlers

2017-12-11 Thread Ted Yu
+1
 Original message From: Tom Bentley  
Date: 12/11/17  6:06 AM  (GMT-08:00) To: dev@kafka.apache.org Subject: Re: 
[VOTE] KIP-86: Configurable SASL callback handlers 
+1 (non-binding)

On 5 May 2017 at 11:57, Mickael Maison  wrote:

> Thanks for the KIP Rajini, this will significantly simplify providing
> custom credential providers
> +1 (non binding)
>
> On Wed, May 3, 2017 at 8:25 AM, Rajini Sivaram 
> wrote:
> > Can we have some more reviews or votes for this KIP to include in
> 0.11.0.0?
> > It is not a breaking change and the code is ready for integration, so it
> > will be good to get it in if possible.
> >
> > Ismael/Jun, since you had reviewed the KIP earlier, can you let me know
> if
> > I can do anything more to get your votes?
> >
> >
> > Thank you,
> >
> > Rajini
> >
> >
> > On Mon, Apr 10, 2017 at 12:18 PM, Edoardo Comar 
> wrote:
> >
> >> +1 (non binding)
> >> many thanks Rajini !
> >>
> >> --
> >> Edoardo Comar
> >> IBM MessageHub
> >> eco...@uk.ibm.com
> >> IBM UK Ltd, Hursley Park, SO21 2JN
> >>
> >> IBM United Kingdom Limited Registered in England and Wales with number
> >> 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> PO6
> >> 3AU
> >>
> >>
> >>
> >> From:   Rajini Sivaram 
> >> To: dev@kafka.apache.org
> >> Date:   06/04/2017 10:53
> >> Subject:    [VOTE] KIP-86: Configurable SASL callback handlers
> >>
> >>
> >>
> >> Hi all,
> >>
> >> I would like to start the voting process for KIP-86:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 86%3A+Configurable+SASL+callback+handlers
> >>
> >>
> >> The KIP makes callback handlers for SASL configurable to make it simpler
> >> to
> >> integrate with custom authentication database or custom authentication
> >> servers. This is particularly useful for SASL/PLAIN where the
> >> implementation in Kafka based on credentials stored in jaas.conf is not
> >> suitable for production use. It is also useful for SCRAM in environments
> >> where ZooKeeper is not secure.
> >>
> >> Thank you...
> >>
> >> Regards,
> >>
> >> Rajini
> >>
> >>
> >>
> >> Unless stated otherwise above:
> >> IBM United Kingdom Limited - Registered in England and Wales with number
> >> 741598.
> >> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> 3AU
> >>
>


[jira] [Resolved] (KAFKA-6283) Configuration of custom SCRAM SaslServer implementations

2017-12-11 Thread Tom Bentley (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley resolved KAFKA-6283.

Resolution: Duplicate

> Configuration of custom SCRAM SaslServer implementations
> 
>
> Key: KAFKA-6283
> URL: https://issues.apache.org/jira/browse/KAFKA-6283
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>
> It is difficult to supply configuration information to a custom 
> {{SaslServer}} implementation when a SCRAM mechanism is used. 
> {{SaslServerAuthenticator.createSaslServer()}} creates a {{SaslServer}} for a 
> given mechanism. The call to {{Sasl.createSaslServer()}} passes the broker 
> config and a callback handler. In the case of a SCRAM mechanism the callback 
> handler is a {{ScramServerCallbackHandler}} which doesn't have access to the 
> {{jaasContext}}. This makes it hard to configure a such a {{SaslServer}} 
> because I can't supply custom keys to the broker config (any unknown ones get 
> removed) and I don't have access to the JAAS config.
> In the case of a non-SCRAM {{SaslServer}}, I at least have access to the JAAS 
> config via the {{SaslServerCallbackHandler}}.
> A simple way to solve this would be to pass the {{jaasContext}} to the 
> {{ScramServerCallbackHandler}} from where a custom {{SaslServerFactory}} 
> could retrieve it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #4274: KAFKA-6283: Configuration of custom SCRAM SaslServ...

2017-12-11 Thread tombentley
Github user tombentley closed the pull request at:

https://github.com/apache/kafka/pull/4274


---


Re: [VOTE] KIP-86: Configurable SASL callback handlers

2017-12-11 Thread Tom Bentley
+1 (non-binding)

On 5 May 2017 at 11:57, Mickael Maison  wrote:

> Thanks for the KIP Rajini, this will significantly simplify providing
> custom credential providers
> +1 (non binding)
>
> On Wed, May 3, 2017 at 8:25 AM, Rajini Sivaram 
> wrote:
> > Can we have some more reviews or votes for this KIP to include in
> 0.11.0.0?
> > It is not a breaking change and the code is ready for integration, so it
> > will be good to get it in if possible.
> >
> > Ismael/Jun, since you had reviewed the KIP earlier, can you let me know
> if
> > I can do anything more to get your votes?
> >
> >
> > Thank you,
> >
> > Rajini
> >
> >
> > On Mon, Apr 10, 2017 at 12:18 PM, Edoardo Comar 
> wrote:
> >
> >> +1 (non binding)
> >> many thanks Rajini !
> >>
> >> --
> >> Edoardo Comar
> >> IBM MessageHub
> >> eco...@uk.ibm.com
> >> IBM UK Ltd, Hursley Park, SO21 2JN
> >>
> >> IBM United Kingdom Limited Registered in England and Wales with number
> >> 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants.
> PO6
> >> 3AU
> >>
> >>
> >>
> >> From:   Rajini Sivaram 
> >> To: dev@kafka.apache.org
> >> Date:   06/04/2017 10:53
> >> Subject:[VOTE] KIP-86: Configurable SASL callback handlers
> >>
> >>
> >>
> >> Hi all,
> >>
> >> I would like to start the voting process for KIP-86:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 86%3A+Configurable+SASL+callback+handlers
> >>
> >>
> >> The KIP makes callback handlers for SASL configurable to make it simpler
> >> to
> >> integrate with custom authentication database or custom authentication
> >> servers. This is particularly useful for SASL/PLAIN where the
> >> implementation in Kafka based on credentials stored in jaas.conf is not
> >> suitable for production use. It is also useful for SCRAM in environments
> >> where ZooKeeper is not secure.
> >>
> >> Thank you...
> >>
> >> Regards,
> >>
> >> Rajini
> >>
> >>
> >>
> >> Unless stated otherwise above:
> >> IBM United Kingdom Limited - Registered in England and Wales with number
> >> 741598.
> >> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6
> 3AU
> >>
>


RE: [DISCUSS]KIP-235 DNS alias and secured connections

2017-12-11 Thread Skrzypek, Jonathan
I agree that if there are hostnames in the list which don't correspond to any 
principal, then the connection will fail, but that's the way the SASL 
authentication with Kerberos works anyways, so we're not breaking anything here 
I think.
This is the current behaviour, if you put 3 FQDNs in bootstrap.servers today 
and one of them doesn't match, you will get AUTH_FAILED.

"Also I think you are suggesting that we update bootstrap servers to be the 
alias plus any other hostnames obtained from DNS lookup."
The suggested change doesn't "make" bootstrap servers the alias, it will 
resolve the alias to retrieve all underlying canonical host names, and will put 
all of them in the list of addresses returned by parseAndValidateAddresses() in 
ClientUtils.


Jonathan Skrzypek 
Middleware Engineering
Messaging Engineering
Goldman Sachs International

-Original Message-
From: Rajini Sivaram [mailto:rajinisiva...@gmail.com] 
Sent: 06 December 2017 12:58
To: dev
Subject: Re: [DISCUSS]KIP-235 DNS alias and secured connections

Sorry, the example I used with public/private DNS is wrong. But there is the 
general issue of multiple DNS names where only one is added to the 
keytab/certificate, but all names are added to the bootstrap server list.

On Wed, Dec 6, 2017 at 12:06 PM, Rajini Sivaram 
wrote:

> Hi Jonathan,
>
> Thank you for the KIP.
>
> I think you are proposing that we always do this (i.e. no option to 
> turn it off). If you have a private and public DNS name, at the 
> moment, if SSL certs and keytabs contain the only public DNS name and 
> the bootstrap servers and advertised listeners are configured to use 
> that name, everything works fine. With the proposed changes in the 
> KIP, the client would add the private name as well to the bootstrap 
> servers. So if a connection is made to the private name, that would 
> result in an authentication exception.
>
> Also I think you are suggesting that we update bootstrap servers to be 
> the alias plus any other hostnames obtained from DNS lookup. This 
> means that connections using the alias would fail with authentication 
> exception. We do not retry in the case of authentication exceptions 
> (and it makes it hard to diagnose security issues if we start 
> expecting some authentication failures to be ok).
>
>
> On Wed, Dec 6, 2017 at 10:43 AM, Stephane Maarek < 
> steph...@simplemachines.com.au> wrote:
>
>> Hi Jonathan
>>
>> I think this will be very useful. I reported something similar here :
>> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.or
>> g_jira_browse_KAFKA-2D4781=DwIFaQ=7563p3e2zaQw0AB1wrFVgyagb2IE5rT
>> ZOYPxLxfZlX4=nNmJlu1rR_QFAPdxGlafmDu9_r6eaCbPOM0NM1EHo-E=dZkDmlZ8
>> moKqpbKF8VNczw7mMEEp4T4erNSucDioFd0=C57W69bpQR4bqFBGTj3tXbJpYvACIY_
>> -5NUAq5LyrG8=
>>
>> Please confirm your kip will address it ?
>>
>> Stéphane
>>
>> On 6 Dec. 2017 8:20 pm, "Skrzypek, Jonathan" 
>> 
>> wrote:
>>
>> > True, amended the KIP, thanks.
>> >
>> > Jonathan Skrzypek
>> > Middleware Engineering
>> > Messaging Engineering
>> > Goldman Sachs International
>> >
>> >
>> > -Original Message-
>> > From: Tom Bentley [mailto:t.j.bent...@gmail.com]
>> > Sent: 05 December 2017 18:19
>> > To: dev@kafka.apache.org
>> > Subject: Re: [DISCUSS]KIP-235 DNS alias and secured connections
>> >
>> > Hi Jonathan,
>> >
>> > It might be worth mentioning in the KIP that this is necessary only 
>> > for
>> > *Kerberos* on SASL, and not other SASL mechanisms. Reading the JIRA 
>> > it makes sensem, but I was confused up until that point.
>> >
>> > Cheers,
>> >
>> > Tom
>> >
>> > On 5 December 2017 at 17:53, Skrzypek, Jonathan <
>> jonathan.skrzy...@gs.com>
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > I would like to discuss a KIP I've submitted :
>> > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.a
>> pache.org_
>> > > confluence_display_KAFKA_KIP-2D=DwIBaQ=7563p3e2zaQw0AB1w
>> rFVgyagb2I
>> > > E5rTZOYPxLxfZlX4=nNmJlu1rR_QFAPdxGlafmDu9_r6eaCbPOM0NM1EHo
>> -E=GWKXA
>> > > ILbqxFU2j7LtoOx9MZ00uy_jJcGWWIG92CyAuc=fv5WAkOgLhVOmF4vhEz
>> q_39CWnEo0
>> > > q0AJbqhAuDFDT0= 
>> > > 235%3A+Add+DNS+alias+support+for+secured+connection
>> > >
>> > > Feedback and suggestions welcome !
>> > >
>> > > Regards,
>> > > Jonathan Skrzypek
>> > > Middleware Engineering
>> > > Messaging Engineering
>> > > Goldman Sachs International
>> > > Christchurch Court - 10-15 Newgate Street London EC1A 7HD
>> > > Tel: +442070512977 <+44%2020%207051%202977>
>> > >
>> > >
>> >
>>
>
>


[jira] [Created] (KAFKA-6343) OOM as the result of creation of 5k topics

2017-12-11 Thread Alex Dunayevsky (JIRA)
Alex Dunayevsky created KAFKA-6343:
--

 Summary: OOM as the result of creation of 5k topics
 Key: KAFKA-6343
 URL: https://issues.apache.org/jira/browse/KAFKA-6343
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.1.1
 Environment: RHEL 7, RAM 755GB
Reporter: Alex Dunayevsky


Create 5k topics *from the code* - quickly, without any delays. Wait until 
brokers will finish loading them. This will actually never happen, since all 
brokers will go down after approx 10-15 minutes or more, depending on the 
hardware.
*Heap*: -Xmx/Xms: 5G, 10G, 50G, 256G... 
*Topology*: 3 brokers, 3 zk.

*Code for 5k topic creation:*
{code:java}
package kafka
import kafka.admin.AdminUtils
import kafka.utils.{Logging, ZkUtils}

object TestCreateTopics extends App with Logging {

  val zkConnect = "grid978:2185"
  var zkUtils = ZkUtils(zkConnect, 6000, 6000, isZkSecurityEnabled = false)

  for (topic <- 1 to 5000) {
AdminUtils.createTopic(
  topic = s"${topic.toString}",
  partitions= 10,
  replicationFactor = 2,
  zkUtils   = zkUtils
)
logger.info(s"Created topic ${topic.toString}")
  }
}
{code}

*OOM:*
{code:java}
java.io.IOException: Map failed
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:920)
at kafka.log.AbstractIndex.(AbstractIndex.scala:61)
at kafka.log.OffsetIndex.(OffsetIndex.scala:52)
at kafka.log.LogSegment.(LogSegment.scala:67)
at kafka.log.Log.loadSegments(Log.scala:255)
at kafka.log.Log.(Log.scala:108)
at kafka.log.LogManager.createLog(LogManager.scala:362)
at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:94)
at 
kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
at 
kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.scala:174)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:174)
at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:168)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:242)
at kafka.cluster.Partition.makeLeader(Partition.scala:168)
at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:758)
at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:757)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:757)
at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:703)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:148)
at kafka.server.KafkaApis.handle(KafkaApis.scala:82)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Map failed
at sun.nio.ch.FileChannelImpl.map0(Native Method)
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:917)
... 28 more
{code}






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Requesting permission to publish KIP

2017-12-11 Thread Viktor Somogyi
Thanks Jun, will publish my KIP soon! :)

On Wed, Dec 6, 2017 at 6:43 PM, Jun Rao  wrote:

> Hi, Viktor,
>
> Just gave you the wiki permission.
>
> Thanks,
>
> Jun
>
> On Wed, Dec 6, 2017 at 8:51 AM, Viktor Somogyi 
> wrote:
>
> > Hi Kafka Owners,
> >
> > Could you please add me to the Confluence users group so I can publish my
> > KIP for https://issues.apache.org/jira/browse/KAFKA-5722?
> >
> > Thanks,
> > Viktor
> >
>


Re: 答复: [VOTE] KIP-223 - Add per-topic min lead and per-partition lead metrics to KafkaConsumer

2017-12-11 Thread Mickael Maison
Sorry to only raise this now, but should we also update the
kafka-consumer-groups tool to display the start offset (and possibly
the lead) ?

Apart from that I'm +1 (non binding)
Thanks

On Mon, Dec 11, 2017 at 4:19 AM, Guozhang Wang  wrote:
> The by-laws ask for 72 hours only, since the starting of the vote, and
> since you have three binding votes you can close this voting now.
>
> Please conclude by a summary of the voting status including non-binding and
> binding votes, thanks.
>
>
> Guozhang
>
> On Sun, Dec 10, 2017 at 8:10 PM, Hu Xi  wrote:
>
>> Hi all,  Would we safely accept this KIP since three binding votes have
>> already been collected (from Jun, Guozhang and Becket)?
>>
>>
>> 
>> 发件人: Guozhang Wang 
>> 发送时间: 2017年12月6日 22:40
>> 收件人: dev@kafka.apache.org
>> 主题: Re: [VOTE] KIP-223 - Add per-topic min lead and per-partition lead
>> metrics to KafkaConsumer
>>
>> Hi Hu Xi,
>>
>> As I said before, it is only a clarification question for its internal
>> implementation; it is not related to the public interfaces.
>>
>>
>> Guozhang
>>
>> On Wed, Dec 6, 2017 at 12:34 AM, Hu Xi  wrote:
>>
>> > Hi Guozhang,
>> >
>> >
>> > Correct me if I am wrong. Of course I could construct the sensor where
>> > per-partition metrics reside as a child of the client-level
>> > "records-lead-min", but it seems that whether doing that way takes no
>> > effects on what this KIP gonna do,  so is it a must?
>> >
>> > 
>> > 发件人: Guozhang Wang 
>> > 发送时间: 2017年12月6日 15:05
>> > 收件人: dev@kafka.apache.org
>> > 主题: Re: [VOTE] KIP-223 - Add per-topic min lead and per-partition lead
>> > metrics to KafkaConsumer
>> >
>> > Hello Xi,
>> >
>> > You can see that in o.a.k.common.metrics.Sensor, we allow constructors to
>> > pass in one or more "parent" Sensors of the constructed Sensor, behind
>> the
>> > scene when a child sensor's metrics have been updated, the updates will
>> be
>> > propagated all the way up to its parents and ancestors (you can checkout
>> > the source code for its impl). On different clients we have been using
>> this
>> > to build many hierarchical sensors, like per-dest-broker metrics v.s.
>> > all-dest-broker metrics on selector, etc.
>> >
>> > My understanding is that the cross-all-partitions "records-lead-min" will
>> > be constructed as the parent of all the per-partition "records-lead-min",
>> > is that true?
>> >
>> >
>> > Guozhang
>> >
>> > On Mon, Dec 4, 2017 at 11:26 PM, Hu Xi  wrote:
>> >
>> > > Guozhang,
>> > >
>> > >
>> > > Thanks for the vote and comments. I am not sure if I fully understand
>> the
>> > > parent metrics here. This KIP will introduce a client-level metric
>> named
>> > > 'records-lead-min' and three per-partition metrics tagged with
>> > > topic Is it the child-parent relationship you mean?
>> > >
>> > >
>> > > 
>> > > 发件人: Guozhang Wang 
>> > > 发送时间: 2017年12月5日 15:16
>> > > 收件人: dev@kafka.apache.org
>> > > 主题: Re: [VOTE] KIP-223 - Add per-topic min lead and per-partition lead
>> > > metrics to KafkaConsumer
>> > >
>> > > Thanks Hu Xi,
>> > >
>> > > I made a pass over the KIP and it lgtm. +1.
>> > >
>> > > Just a clarification question: for the cross-partition
>> "records-lead-min"
>> > > metric, would that be implemented as a parent metric of the
>> per-partition
>> > > metrics?
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Mon, Dec 4, 2017 at 3:07 PM, Dong Lin  wrote:
>> > >
>> > > > +1 (non-binding)
>> > > >
>> > > > On Wed, Nov 29, 2017 at 7:05 PM, Hu Xi  wrote:
>> > > >
>> > > > > Hi all,
>> > > > >
>> > > > > As I didn't see any further discussion around this KIP, I'd like to
>> > > start
>> > > > > voting.
>> > > > >
>> > > > > KIP documentation:
>> > > > >
>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > > 223+-+Add+per-topic+min+lead+and+per-partition+lead+
>> > > > > metrics+to+KafkaConsumer
>> > > > >
>> > > > >
>> > > > >
>> > > > > Cheers,
>> > > > >
>> > > > > huxihx
>> > > > >
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> -- Guozhang


[jira] [Created] (KAFKA-6342) Remove workaround for JSON parsing of non-escaped strings

2017-12-11 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-6342:
-

 Summary: Remove workaround for JSON parsing of non-escaped strings
 Key: KAFKA-6342
 URL: https://issues.apache.org/jira/browse/KAFKA-6342
 Project: Kafka
  Issue Type: Task
  Components: core
Reporter: Rajini Sivaram
 Fix For: 1.1.0


KAFKA-6319 added a workaround to parse invalid JSON persisted using older 
versions of Kafka because special characters were not escaped. The workaround 
is required in 1.0.1 to enable parsing invalid JSON from ACL configs in 
ZooKeeper. We can remove the workaround in kafka.utils.Json#parseFull for 1.1.0



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [VOTE] KIP-225 - Use tags for consumer “records.lag” metrics

2017-12-11 Thread charly molter
Hi,
The KIP has been updated. As it has change should I restart the vote?

In any case I'm still missing one binding vote if anyone wants to help.
Thanks!

On Wed, Dec 6, 2017 at 6:13 PM, charly molter 
wrote:

> Sounds good I'll update the KIP
>
> On Wed, Dec 6, 2017 at 6:04 PM, Becket Qin  wrote:
>
>> Hi Charly,
>>
>> Personally I prefer emitting both and deprecate old one. This does not
>> block on the 2.0 release and we don't need to worry about more users
>> picking up the old metric in 1.1 release.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Tue, Dec 5, 2017 at 4:08 AM, charly molter 
>> wrote:
>>
>> > Thanks Jun and Becket!
>> >
>> > I think your point about 1.0 vs 2.0 makes sense I can update the KIP to
>> > reflect this.
>> >
>> > What's the process for 2.0 contributions as I can see that trunk is 1.1
>> and
>> > no 2.x branch?
>> >
>> > Here's what I can do:
>> > - Not write the code change until trunk moves to 2.0.
>> > - Write the change but leave the PR open until we start working on 2.0.
>> > - Stall this KIP until 2.0 development starts (IIRC it's pretty soon).
>> > - Do it in a backward compatible way (publish both sets of metrics) and
>> > open a Jira tagged on 2.0 to remove the old metrics.
>> >
>> > Let me know what's the right way to go.
>> >
>> > Thanks!
>> >
>> >
>> > On Tue, Dec 5, 2017 at 12:45 AM, Becket Qin 
>> wrote:
>> >
>> > > Thanks for the KIP, Charly.
>> > >
>> > > +1. The proposal looks good to me. I agree with Jun that it is better
>> to
>> > > make the metrics consistent with other metrics. That being said,
>> arguably
>> > > this is a backwards incompatible change. Since we are at 1.0,
>> backwards
>> > > incompatible changes are supposed to be in 2.0. Not sure if that is
>> the
>> > > plan or not.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On Mon, Dec 4, 2017 at 4:20 PM, Jun Rao  wrote:
>> > >
>> > > > Hi, Jiangjie,
>> > > >
>> > > > Since you proposed the original KIP-92, do you want to see if this
>> KIP
>> > > > makes sense?
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Jun
>> > > >
>> > > > On Wed, Nov 22, 2017 at 2:48 AM, charly molter <
>> > charly.mol...@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > Hi,
>> > > > >
>> > > > > I would like to start the voting thread for KIP-225.
>> > > > > This KIP proposes to correct some lag metrics emitted by the
>> > consumer.
>> > > > >
>> > > > > The KIP wiki is here:
>> > > > > https://cwiki.apache.org/confluence/x/uaBzB
>> > > > >
>> > > > > The discussion thread is here:
>> > > > > http://search-hadoop.com/m/Kafka/uyzND1F33uL19AYx/threaded
>> > > > >
>> > > > > Also could someone assign me to this Jira: KAFKA-5890
>> > > > > 
>> > > > >
>> > > > > Thanks,
>> > > > > --
>> > > > > Charly Molter
>> > > > >
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > Charly Molter
>> >
>>
>
>
>
> --
> Charly Molter
>



-- 
Charly Molter


[jira] [Created] (KAFKA-6341) 'networkThreadTimeNanos' in KafkaChannel is not thread safe

2017-12-11 Thread huxihx (JIRA)
huxihx created KAFKA-6341:
-

 Summary: 'networkThreadTimeNanos' in KafkaChannel is not thread 
safe
 Key: KAFKA-6341
 URL: https://issues.apache.org/jira/browse/KAFKA-6341
 Project: Kafka
  Issue Type: Bug
  Components: metrics
Affects Versions: 1.0.0
Reporter: huxihx


`networkThreadTimeNanos` in KafkaChannel is of primitive long type which is not 
thread safe. Multiple Processor threads could access(read and write) this 
variable at the same time. Since JVM spec does not guarantee of the atomic 
64-bit operations against long/double types, it's safer to employ AtomicLong 
instead of the naive long type.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)