Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-28 Thread Boyang Chen
Thanks Guozhang for the new proposal here!

So I'd like to propose a slightly modified version of LeaveGroupRequest:
instead of letting the static member consumer client themselves to send the
request (which means we still need to have some hidden configs to turn it
off like we did today), how about just letting any other client to send
this request since the LeaveGroupRequest only requires group.id and
member.id? So back to your operational scenarios, if some static member has
been found crashed and it is not likely to comeback, or we simply want to
shrink the size of the group by shutting down some static members, we can
use an admin client to send the LeaveGroupRequest after the instance has
been completely shutdown or crashed to kick them out of the group and also
triggers the rebalance.

One issue though, is that users may not know the member id required in the
LeaveGroupRequest. To work around it we can add the `group.instance.id`
along with the member id as well and then allow member id null-able. The
coordinator logic would then be modified as 1) if member.id is specified,
ignore instance.id and always use member.id to find the member to kick out,
2) otherwise, try with the instance.id to find the corresponding member.id
and kick it out, 3) if none is found, reject with an error code.

So in sum the alternative changes are:

a) Modify LeaveGroupRequest to add group.instance.id
b) Modify coordinator logic to handle such request on the broker side.
c) Add a new API in AdminClient like "removeMemberFromGroup(groupId,
instanceId)" which will be translated as a LeaveGroupRequest.
d) [Optional] we can even batch the request by allowing
"removeMemberFromGroup(groupId, list[instanceId])" and then make `member.id`
and `instance.id` field of LeaveGroupRequest to be an array instead of a
single entry.
e) We can also remove the admin ConsumerRebalanceRequest as well for
simplicity (why not? paranoid of having as less request protocols as
possible :), as it is not needed anymore with the above proposal.
I agree that reusing LeaveGroupRequest is actually a good idea: we only need to 
iterate
over an existing request format. Also I found that we haven't discussed how we 
want to enable
this feature on Streaming applications, which is different from common consumer 
application in that
Stream app uses stream thread as individual consumer.
For example if user specifies the client id, the stream consumer client id will 
be like:
User client id + "-StreamThread-" + thread id + "-consumer"

So I'm thinking we should do sth similar for defining group.instance.id on 
Stream. We shall define another
config called `stream.instance.id` which would be used as prefix, and for each 
thread consumer the formula
will look like:
`group.instance.id` = `stream.instance.id` + "-" + thread id + "-consumer"

And for the ease of use, the interface of leave group request could include 
`group.instance.id.prefix` instead of
`group.instance.id` so that we could batch remove consumers relating to a 
single stream instance. This is more intuitive
and flexible since specifying names of 16~32 * n (n = number of stream 
instances to shut down) consumers is not an easy
job without client management tooling.

How does this workaround sound?

Boyang

From: Guozhang Wang 
Sent: Thursday, November 29, 2018 2:38 AM
To: dev
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hi Boyang,

I was thinking that with the optional static members in the admin
ConsumerRebalanceRequest it should be sufficient to kick out the static
member before their session timeout (arguably long in practice) have not
reached. But now I see your concern is that in some situations the admin
operators may not even know the full list of static members, but ONLY know
which static member has failed and hence would like to kick out of the
group.

So I'd like to propose a slightly modified version of LeaveGroupRequest:
instead of letting the static member consumer client themselves to send the
request (which means we still need to have some hidden configs to turn it
off like we did today), how about just letting any other client to send
this request since the LeaveGroupRequest only requires group.id and
member.id? So back to your operational scenarios, if some static member has
been found crashed and it is not likely to comeback, or we simply want to
shrink the size of the group by shutting down some static members, we can
use an admin client to send the LeaveGroupRequest after the instance has
been completely shutdown or crashed to kick them out of the group and also
triggers the rebalance.

One issue though, is that users may not know the member id required in the
LeaveGroupRequest. To work around it we can add the `group.instance.id`
along with the member id as well and then allow member id null-able. The
coordinator logic would then be modified as 1) if member.id is specified,
ignore instance.id and 

Jenkins build is back to normal : kafka-2.1-jdk8 #64

2018-11-28 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-7686) Remove PowerMock from Connect Tests

2018-11-28 Thread Magesh kumar Nandakumar (JIRA)
Magesh kumar Nandakumar created KAFKA-7686:
--

 Summary: Remove PowerMock from Connect Tests
 Key: KAFKA-7686
 URL: https://issues.apache.org/jira/browse/KAFKA-7686
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Magesh kumar Nandakumar
Assignee: Magesh kumar Nandakumar


Remove PowerMock from Connect Tests



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #3226

2018-11-28 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7671: Stream-Global Table join should not reset repartition 
flag

[junrao] Forward topic from console consumer to deserializer (#5704)

--
[...truncated 2.49 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

> Task :streams:streams-scala:test

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionWithNamedRepartitionTopic PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionJava STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegionJava PASSED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegion STARTED

org.apache.kafka.streams.scala.StreamToTableJoinScalaIntegrationTestImplicitSerdes
 > testShouldCountClicksPerRegion PASSED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWordsMaterialized 
STARTED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWordsMaterialized 
PASSED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWordsJava STARTED

org.apache.kafka.streams.scala.WordCountTest > testShouldCountWordsJava PASSED


Re: [DISCUSS] KIP-394: Require member.id for initial join group request

2018-11-28 Thread Boyang Chen
Thanks Matthias for the question, and Stanislav for the explanation!

For the scenario described, we will never let a member join the GroupMetadata 
map
if it uses UNKNOWN_MEMBER_ID. So the workflow will be like this:

  1.  Group is empty. Consumer c1 started. Join with UNKNOWN_MEMBER_ID;
  2.  Broker rejects while allocating a member.id to c1 in response (c1 
protocol version is current);
  3.  c1 handles the error and rejoins with assigned member.id;
  4.  Broker stores c1 in its group metadata;
  5.  Consumer c2 started. Join with UNKNOWN_MEMBER_ID;
  6.  Broker rejects while allocating a member.id to c2 in response (c2 
protocol version is current);
  7.  c2 fails to get the response/crashes in the middle;
  8.  After certain time, c2 restarts a join request with UNKNOWN_MEMBER_ID;

As you could see, c2 will repeat step 6~8 until successfully send back a join 
group request with allocated id.
By then broker will include c2 within the broker metadata map.

Does this sound clear to you?

Best,
Boyang

From: Stanislav Kozlovski 
Sent: Wednesday, November 28, 2018 7:39 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-394: Require member.id for initial join group request

Hey Matthias,

I think the notion is to have the `session.timeout.ms` to start ticking
when the broker responds with the member.id. Then, the broker would
properly expire consumers and not hold too many stale ones.
This isn't mentioned in the KIP though so it is worth to wait for Boyang to
confirm

On Wed, Nov 28, 2018 at 3:10 AM Matthias J. Sax 
wrote:

> Thanks for the KIP Boyang.
>
> I guess I am missing something, but I am still learning more details
> about the rebalance protocol, so maybe you can help me out?
>
> Assume a client sends UNKNOWN_MEMBER_ID in its first joinGroup request.
> The broker generates a `member.id` and sends it back via
> `MEMBER_ID_REQUIRED` error response. This response might never reach the
> client or the client fails before it can send the second joinGroup
> request. Thus, a client would need to start over with a new
> UNKNOWN_MEMBER_ID in its joinGroup request. Thus, the broker needs to
> generate a new `member.id` again.
>
> So it seems the problem is moved, but not resolved? The motivation of
> the KIP is:
>
> > The edge case is that if initial join group request keeps failing due to
> connection timeout, or the consumer keeps restarting,
>
> From my understanding, this KIP move the issue from the first to the
> second joinGroup request (or broker joinGroup response).
>
> But maybe I am missing something. Can you help me out?
>
>
> -Matthias
>
>
> On 11/27/18 6:00 PM, Boyang Chen wrote:
> > Thanks Stanislav and Jason for the suggestions!
> >
> >
> >> Thanks for the KIP. Looks good overall. I think we will need to bump the
> >> version of the JoinGroup protocol in order to indicate compatibility
> with
> >> the new behavior. The coordinator needs to know when it is safe to
> assume
> >> the client will handle the error code.
> >>
> >> Also, I was wondering if we could reuse the REBALANCE_IN_PROGRESS error
> >> code. When the client sees this error code, it will take the memberId
> from
> >> the response and rejoin. We'd still need the protocol bump since older
> >> consumers do not have this logic.
> >
> > I will add the join group protocol version change to the KIP. Meanwhile
> I feel for
> > understandability it's better to define a separate error code since
> REBALANCE_IN_PROGRESS
> > is not the actual cause of the returned error.
> >
> >> One small question I have is now that we have one and a half round-trips
> >> needed to join in a rebalance (1 full RT addition), is it worth it to
> >> consider increasing the default value of `
> group.initial.rebalance.delay.ms`?
> > I guess we could keep it for now. After KIP-345 and incremental
> cooperative rebalancing
> > work we should be safe to deprecate `group.initial.rebalance.delay.ms`.
> Also one round trip
> > shouldn't increase the latency too much IMO.
> >
> > Best,
> > Boyang
> > 
> > From: Stanislav Kozlovski 
> > Sent: Wednesday, November 28, 2018 2:32 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-394: Require member.id for initial join
> group request
> >
> > Hi Boyang,
> >
> > The KIP looks very good.
> > One small question I have is now that we have one and a half round-trips
> > needed to join in a rebalance (1 full RT addition), is it worth it to
> > consider increasing the default value of `
> group.initial.rebalance.delay.ms`?
> >
> > Best,
> > Stanislav
> >
> > On Tue, Nov 27, 2018 at 5:39 PM Jason Gustafson 
> wrote:
> >
> >> Hi Boyang,
> >>
> >> Thanks for the KIP. Looks good overall. I think we will need to bump the
> >> version of the JoinGroup protocol in order to indicate compatibility
> with
> >> the new behavior. The coordinator needs to know when it is safe to
> assume
> >> the client will handle the error code.
> >>
> >> Also, I was wondering 

[jira] [Resolved] (KAFKA-7449) Kafka console consumer is not sending topic to deserializer

2018-11-28 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7449.

   Resolution: Fixed
 Assignee: Mathieu Chataigner
Fix Version/s: 2.2.0

Merged to trunk.

> Kafka console consumer is not sending topic to deserializer
> ---
>
> Key: KAFKA-7449
> URL: https://issues.apache.org/jira/browse/KAFKA-7449
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Mathieu Chataigner
>Assignee: Mathieu Chataigner
>Priority: Major
>  Labels: easyfix, pull-request-available
> Fix For: 2.2.0
>
>
> We tried to create a custom Deserializer to consume some protobuf topics.
> We have a mechanism for getting the protobuf class from topic name however 
> the console consumer is not forwarding the topic of the console consumer 
> record down to the deserializer.
> Topic information is available in the ConsumerRecord.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-351: Add --under-min-isr option to describe TopicCommand

2018-11-28 Thread Kevin Lu
@Manikumar I have updated the Compatibility section to note that this
option is not supported with the deprecated "--zookeeper" option.

We still need 1 more binding vote for this to be accepted.

Thanks~

Regards,
Kevin

On Wed, Nov 28, 2018 at 9:23 AM Gwen Shapira  wrote:

> +1 (binding)
> On Thu, Nov 8, 2018 at 1:37 PM Kevin Lu  wrote:
> >
> > Hi All,
> >
> > I'm starting the vote thread for KIP-351: Add --under-min-isr option to
> > describe topics command.
> >
> > KIP:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-351%3A+Add+--under-min-isr+option+to+describe+topics+command
> >
> > Discussion thread:
> >
> https://lists.apache.org/thread.html/90d1652ebc03a7be4100dd101b92a7dcefe63d144856c5f6c132381b@%3Cdev.kafka.apache.org%3E
> >
> > Thanks!
> >
> > Regards,
> > Kevin
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


Re: [DISCUSS] KIP-379: Multiple Consumer Group Management

2018-11-28 Thread Jason Gustafson
Hi Alex,

Sorry for the late reply. Your message didn't get attached to the main
thread and I missed it.

#1. Yeah, I think we should continue to accept the old csv format.
Compatibility is important for all public APIs.
#2. I think this is less important from a compatibility perspective. On the
one hand, it makes the output compatible with currently supported usage. On
the other, it makes it more annoying to write tools which invoke this
command because they need to treat the single group case separately. I'm
probably leaning toward not doing this one, but I don't have a strong
opinion.
#3. To clarify, my suggestion was to put the group id first. I think Vahid
was in agreement. From your comment, it sounds like you agree as well?
#4. I agree supporting regex can be left for future work.

Thanks,
Jason

On Mon, Nov 5, 2018 at 7:55 AM Alex D  wrote:

> Hello guys,
>
> Thank you for your suggestions!
> I've made a short resume of all suggestions proposed for further
> possible code corrections.
> Since not all opinions match, let's review once again and decide.
>
> #1. Support old csv format. Proposed by Jason.
> Yes: Jason, Vahid
>
> If backwards compatibility is important for this specific (and, I
> believe, infrequent) case, ready to make corrections. Final word?
>
> #2. Do not show group name for `--describe` output in case a single
> `--group` is specified. Proposed by Jason.
> Yes: Jason
>
> Alternatively, the user will always expect the output to be the
> same
> for any `--describe` query. Ready to make corrections if this is
> important. Final word?
>
> #3. GROUP column should not be the first in the row. Proposed by Jason.
> Yes: Jason
> No:  Vahid
>
> For the group offset configuration, the group entity appears to be
> the top priority and starting a table with a GROUP column makes more
> sense, I believe. Plus, it's quicker and easier to spot to which group
> the offsets belong to.
> Apply corrections or leave as is?
>
> #4. Single regex vs multiple `--group` flags. Proposed by eazama..
>
> There are a few reasons behind this. Firstly, there are no rules
> for
> defining group names unlike for topic names that have their own
> validation routine according to a simple regex. Group names may
> contain any symbols possible and simply splitting them by comma won't
> work, at least without using escape characters maybe. Secondly,
> repetition of the `--group` flag had already been implemented for the
> group deletion logic and we don't not want to break the backwards
> compatibility. Finally, visually, it's a bit easier to read and
> compose a long query with a large number of groups than throwing
> everything into one very long string.
>
> #5. Valid scenario where we would want to delete all consumer groups.
> Asked by Vahid.
>
> There should be one, I believe ;) Already received a few requests
> from colleagues.
>
> # KIP approvals:
> Suman: +1
>
> > Sat, 20 Oct 2018 17:10:16 GMT,  wrote:
> > Is there a reason for using multiple --group flags over having it accept
> a regex?
> >
> > The topics command currently accepts a regex for most operations and
> doesn't support using
> > multiple topics flags. It seems like it would be better to take a more
> standardized approach
> > to providing this type of information.
> >
> >
> >> On Oct 19, 2018, at 10:28 AM, Suman B N  wrote:
> >>
> >> This eases debugging metadata information of consumer groups and
> offsets in
> >> case of client hungs which we have been facing frequently.
> >> +1 from me. Well done Alex!
> >>
> >> -Suman
> >>
> >> On Fri, Oct 19, 2018 at 8:36 PM Vahid Hashemian <
> vahid.hashem...@gmail.com>
> >> wrote:
> >>
> >>> Thanks for proposing the KIP. Looks good to me overall.
> >>>
> >>> I agree with Jason's suggestion that it would be best to keep the
> current
> >>> output format when a single '--group' is present. Because otherwise,
> there
> >>> would be an impact to users who rely on the current output format.
> Also,
> >>> starting with a GROUP column makes more sense to me.
> >>>
> >>> Also, and for my own info, is there a valid scenario where we would
> want to
> >>> delete all consumer groups? It sounds to me like a potentially
> dangerous
> >>> feature. I would imagine that it would help more with dev/test
> >>> environments, where we normally have a few groups (for which the
> repeating
> >>> '--group' option should work).
> >>>
> >>> Regards!
> >>> --Vahid
> >>>
> >>> On Thu, Oct 18, 2018 at 11:28 PM Jason Gustafson 
> >>> wrote:
> >>>
>  Hi Alex,
> 
>  Thanks for the KIP. I think it makes sense, especially since most of
> the
>  group apis are intended for batching anyway.
> 
>  The only questions I have are about compatibility. For example, the
> csv
>  format for resetting offsets is changed, so will we continue to
> support
> >>> the
>  old format? Also, if only one `--group` option is passed, do you 

Re: [DISCUSS] KIP-392: Allow consumers to fetch from the closest replica

2018-11-28 Thread Jun Rao
Hi, Jason,

Thanks for the KIP. Looks good overall. A few minor comments below.

1. The section on handling FETCH_OFFSET_TOO_LARGE error says "Use the
OffsetForLeaderEpoch API to verify the current position with the leader".
The OffsetForLeaderEpoch request returns log end offset if the request
leader epoch is the latest. So, we won't know the true high watermark from
that request. It seems that the consumer still needs to send ListOffset
request to the leader to obtain high watermark?

2. If a non in-sync replica receives a fetch request from a consumer,
should it return a new type of error like ReplicaNotInSync?

3. Could ReplicaSelector be closable?

4. Currently, the ISR propagation from the leader to the controller can be
delayed up to 60 secs through ReplicaManager.IsrChangePropagationInterval.
In that window, the consumer could still be consuming from a non in-sync
replica. The relatively large delay is mostly for reducing the ZK writes
and the watcher overhead. Not sure what's the best way to address this. We
could potentially make this configurable.

5. It may be worth mentioning that, to take advantage of affinity, one may
also want to have a customized PartitionAssignor to have an affinity aware
assignment in addition to a customized ReplicaSelector.

Thanks,

Jun

On Wed, Nov 21, 2018 at 12:54 PM Jason Gustafson  wrote:

> Hi All,
>
> I've posted a KIP to add the often-requested support for fetching from
> followers:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
> .
> Please take a look and let me know what you think.
>
> Thanks,
> Jason
>


Build failed in Jenkins: kafka-trunk-jdk8 #3225

2018-11-28 Thread Apache Jenkins Server
See 


Changes:

[colin] MINOR: Support long maxMessages in Trogdor consume/produce bench workers

--
[...truncated 2.64 MB...]
org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testGettingGroupMultipleTimes STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testGettingGroupMultipleTimes PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testCreatingTags STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testCreatingTags PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testMetricGroupIdWithoutTags STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testMetricGroupIdWithoutTags PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testCreatingTagsWithOddNumberOfTags STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testCreatingTagsWithOddNumberOfTags PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testGettingGroupWithOddNumberOfTags STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > 
testGettingGroupWithOddNumberOfTags PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testMetricGroupIdIdentity 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testMetricGroupIdIdentity 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithoutClose 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithoutClose 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testGettingGroupWithTags 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testGettingGroupWithTags 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithClose 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testRecreateWithClose 
PASSED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testKafkaMetricsNotNull 
STARTED

org.apache.kafka.connect.runtime.ConnectMetricsTest > testKafkaMetricsNotNull 
PASSED

org.apache.kafka.connect.converters.ShortConverterTest > testBytesNullToNumber 
STARTED

org.apache.kafka.connect.converters.ShortConverterTest > testBytesNullToNumber 
PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectType STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectType PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingHeaderWithTooManyBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingHeaderWithTooManyBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > testNullToBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > testNullToBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectHeader STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testSerializingIncorrectHeader PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingDataWithTooManyBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testDeserializingDataWithTooManyBytes PASSED

org.apache.kafka.connect.converters.ShortConverterTest > 
testConvertingSamplesToAndFromBytes STARTED

org.apache.kafka.connect.converters.ShortConverterTest > 
testConvertingSamplesToAndFromBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > testBytesNullToNumber 
STARTED

org.apache.kafka.connect.converters.FloatConverterTest > testBytesNullToNumber 
PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectType STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectType PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingHeaderWithTooManyBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingHeaderWithTooManyBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > testNullToBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > testNullToBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectHeader STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testSerializingIncorrectHeader PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingDataWithTooManyBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testDeserializingDataWithTooManyBytes PASSED

org.apache.kafka.connect.converters.FloatConverterTest > 
testConvertingSamplesToAndFromBytes STARTED

org.apache.kafka.connect.converters.FloatConverterTest > 
testConvertingSamplesToAndFromBytes PASSED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectSchemaless STARTED

org.apache.kafka.connect.converters.ByteArrayConverterTest > 
testFromConnectSchemaless PASSED


Problem in CI for pull request

2018-11-28 Thread lk gen
Hi,

  I made a pull request and it passed CI on JDK 11 but failed on JDK 8

  I think the JDK 8 error may not related to my commit but an environment
problem on the CI

  How can I rerun the CI for my pull request ?

  The pull request is at
https://github.com/apache/kafka/pull/5960

error states

*19:27:48* ERROR: H36 is offline; cannot locate JDK 1.8
(latest)*19:27:48* ERROR: H36 is offline; cannot locate Gradle 4.8.1


Thanks


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-28 Thread Guozhang Wang
Hi Boyang,

I was thinking that with the optional static members in the admin
ConsumerRebalanceRequest it should be sufficient to kick out the static
member before their session timeout (arguably long in practice) have not
reached. But now I see your concern is that in some situations the admin
operators may not even know the full list of static members, but ONLY know
which static member has failed and hence would like to kick out of the
group.

So I'd like to propose a slightly modified version of LeaveGroupRequest:
instead of letting the static member consumer client themselves to send the
request (which means we still need to have some hidden configs to turn it
off like we did today), how about just letting any other client to send
this request since the LeaveGroupRequest only requires group.id and
member.id? So back to your operational scenarios, if some static member has
been found crashed and it is not likely to comeback, or we simply want to
shrink the size of the group by shutting down some static members, we can
use an admin client to send the LeaveGroupRequest after the instance has
been completely shutdown or crashed to kick them out of the group and also
triggers the rebalance.

One issue though, is that users may not know the member id required in the
LeaveGroupRequest. To work around it we can add the `group.instance.id`
along with the member id as well and then allow member id null-able. The
coordinator logic would then be modified as 1) if member.id is specified,
ignore instance.id and always use member.id to find the member to kick out,
2) otherwise, try with the instance.id to find the corresponding member.id
and kick it out, 3) if none is found, reject with an error code.

So in sum the alternative changes are:

a) Modify LeaveGroupRequest to add group.instance.id
b) Modify coordinator logic to handle such request on the broker side.
c) Add a new API in AdminClient like "removeMemberFromGroup(groupId,
instanceId)" which will be translated as a LeaveGroupRequest.
d) [Optional] we can even batch the request by allowing
"removeMemberFromGroup(groupId, list[instanceId])" and then make `member.id`
and `instance.id` field of LeaveGroupRequest to be an array instead of a
single entry.
e) We can also remove the admin ConsumerRebalanceRequest as well for
simplicity (why not? paranoid of having as less request protocols as
possible :), as it is not needed anymore with the above proposal.


WDYT?


Guozhang

On Wed, Nov 28, 2018 at 5:34 AM Boyang Chen  wrote:

> Thanks Guozhang and Mayuresh for the follow up! Answers are listed below.
>
>
> >  5. Regarding "So in summary, *the member will only be removed due to
> > session timeout*. We shall remove it from both in-memory static member
> name
> > mapping and member list." If the rebalance is invoked manually using the
> > the admin apis, how long should the group coordinator wait for the
> members
> > of the group to send a JoinGroupRequest for participating in the
> rebalance?
> > How is a lagging consumer handled?
>
> Great question. Let's use c1~c4 example here:
>
>   1.  Consumer c1, c2, c3, c4 in stable state
>   2.  c4 goes down and we detect this issue before session timeout through
> client monitoring. Initiate a ConsumerRebalanceRequest.
>   3.  A rebalance will be kicking off, and after rebalance timeout we
> shall keep the same assignment for c1~4, if the session timeout for c4
> hasn't reached
>   4.  Group back to stable with c1~4 (although c4 is actually offline)
>   5.  c4 session timeout finally reached: another rebalance triggered.
>
> For step 3, if session timeout triggered within rebalance timeout, only
> c1~3 will be participating in the rebalance. This is what we mean by saying
> "rebalance
> timeout shall not remove current members, only session timeout will do."
> As you could see this is not an ideal scenario: we trigger extra rebalance
> at step 5. In my reply to Guozhang I'm asking whether we should still use
> LeaveGroupRequest for static members to send a signal to broker saying "I'm
> currently offline", and when we send ConsumerRebalanceRequest to broker, we
> will actually kick off c4 because it says it's offline already, saving one
> or multiple additional rebalances later. This way the
> ConsumerRebalanceRequest will be more effective in making correct judgement
> on the group status since we have more feedback from client side.
>
> > - When we say that we would use invokeConsumerRebalance(groupId) to down
> > scale, with the example in the above question, how will the
> > GroupCoordinator know that c4 should be kicked out of the group since we
> > are trying to invoke rebalance proactively without waiting for c4's
> session
> > time out to expire. Should there be a way of telling the GroupCoordinator
> > that consumer c4 has been kicked out of the groupId = "GroupA"?
> Previous proposal should be suffice to answer this question 
>
> - Also it looks like the statement "If the `member.id` uses
> > 

Re: [VOTE] KIP-351: Add --under-min-isr option to describe TopicCommand

2018-11-28 Thread Gwen Shapira
+1 (binding)
On Thu, Nov 8, 2018 at 1:37 PM Kevin Lu  wrote:
>
> Hi All,
>
> I'm starting the vote thread for KIP-351: Add --under-min-isr option to
> describe topics command.
>
> KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-351%3A+Add+--under-min-isr+option+to+describe+topics+command
>
> Discussion thread:
> https://lists.apache.org/thread.html/90d1652ebc03a7be4100dd101b92a7dcefe63d144856c5f6c132381b@%3Cdev.kafka.apache.org%3E
>
> Thanks!
>
> Regards,
> Kevin



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


RE: [DISCUSS] KIP-383 Pluggable interface for SSL Factory

2018-11-28 Thread Pellerin, Clement
If no one objects, I'll rewrite the KIP like this and resubmit for discussion:
Ideal interface, ignore the atomicity issue, only keystore and truststore is 
reconfigurable.

I was not planning to log a Jira for the CertificateEntries comparison bug 
because my fix will correct it anyway.

-Original Message-
From: Pellerin, Clement 
Sent: Wednesday, November 21, 2018 10:42 AM
To: dev@kafka.apache.org
Subject: RE: [DISCUSS] KIP-383 Pluggable interface for SSL Factory

This turned out to be trickier than I thought and I don't have a firm solution 
yet.

Since SslFactory will handle reconfiguration, the idea is to make the 
configuration immutable in the pluggable factory.
SslFactory would create a new pluggable factory every time the configuration 
changes.
The pluggable factory creates its SSLContext when it is configured and never 
change it.
It turns out SslFactory does not really need the SSLContext, so it can use the 
new pluggable factory as an SSLEngine factory.

The ideal interface looks like:

public interface SslEngineFactory {
void configure(Map configs);
SSLEngine createSslEngine(String peerHost, int peerPort);
}

This assumes SslFactory copies the configs and overwrites ssl.client.auth when 
there is a clientAuthConfigOverride.
I am told the right way to do this in Kafka is with extra parameters in 
configure().

public interface SslEngineFactory {
void configure(Map configs, String 
clientAuthConfigOverride);
SSLEngine createSslEngine(String peerHost, int peerPort);
}

With this interface, SslEngineFactory must load its keystore and truststore.
Meanwhile, SslFactory must also load the keystore and truststore to detect a 
future change in configuration.
This slight inefficiency is not important, what truly matters is the lack of 
atomicity.
The keystore and truststore might change between the time SslFactory inspects 
them and the time SslEngineFactory loads them again.

There are multiple ways to address this issue:

1. We can ignore the issue. If we always make sure SslFactory inspects the 
files before SslEngineFactory is configured,
SslEngineFactory might load a newer version of the files, and SslFactory 
might unnecessarily recreate the SslEngineFactory
the next time a reconfiguration is called, but we will never miss an update.

2. We can load the keystore and truststore in SslFactory and pass them to 
SslEngineFactory.configure().
The configs for the keystore and truststore are ignored by SslEngineFactory.

public interface SslEngineFactory {
void configure(Map configs, String 
clientAuthConfigOverride, KeyStore keystore, KeyStore truststore);
SSLEngine createSslEngine(String peerHost, int peerPort);
}

Notice we pass the KeyStore and not the SecurityStore since 
SslEngineFactory does not need the metadata.
I find this signature messy.

3. We could query the SslEngineFactory for the keystore and truststore it 
loaded.
This is ugly because SslEngineFactory would need to keep metadata for these 
files,
or we would have to compare the actual KeyStore entries.

4. We could create a new SslEngineFactory for every reconfiguration assuming 
the configuration always changed.
This would create a new SSLContext every time and therefore lose the SSL 
Session cache every time.

Maybe solution 1 is sufficient. Notice we need to load the actual keystore 
early in SslFactory
since we need to fix the lastModified time and the CertificateEntries.
There is a bug currently in the comparison of the CertificateEntries because 
the KeyStore is reloaded
at the time of the comparison, which might compare the new contents against the 
same new contents.

Solution 4 brings the issue of what is a reconfiguration.
I was wondering why Kafka supports only reconfiguration of the keystore and 
truststore?
Why not any of the other config settings?

To support reconfiguration of the keystore and truststore and nothing else,
SslFactory would have to pass the original configs to the new SslEngineFactory.
SslFactory would have to keep a copy of the original configs and if we are 
using solution 1,
it must overwrite the settings for the keystore and truststore.
If we do that, we might as well overwrite the clientAuth also and use the ideal 
interface instead.

Thoughts?


Re: [DISCUSS] KIP-390: Add producer option to adjust compression level

2018-11-28 Thread Ismael Juma
Hi Dongjin,

To clarify, I mean a broker topic config with regards to point 1. As you
know, compression can be done by the producer and/or by the broker. The
default is for the broker to just use whatever compression was used by the
producer, but this can be changed by the user on a per topic basis. It
seems like it would make sense for the configs to be . consistent between
producer and broker.

For point 2, I haven't looked at the implementation, but we could do it in
the `CompressionType` enum by invoking the right constructor or retrieving
the default value via a constant (if defined). That's an implementation
detail and can be discussed in the PR. The more general point is to rely on
the library defaults instead of choosing one ourselves.

For point 3, I'm in favour of doing that in this KIP.

Ismael

On Wed, Nov 28, 2018 at 7:01 AM Dongjin Lee  wrote:

> Thank you Ismael, here are the answers:
>
> *1. About topic config*
>
> After some consideration, I concluded that topic config doesn't need to
> support compression.level. Here is why: since the compression is conducted
> by the client, the one who can select the best compression level is the
> client itself. Let us assume that the compression level is set at the topic
> config level. In that case, there is a possibility that the compression
> level is not optimal for some producers. Actually, Kafka's go client also
> supports compression level functionality for the producer config only.
>  (wait, do we
> need
> to add this reasoning in the KIP, rejected alternatives section?)
>
> *2. About default level*
>
> As of current draft implementation, the default compression is set on the
> CompressionType enum. Of course, changing this strategy into relying on a
> method from the library to pick the default compression level seems
> possible, like `GZIPBlockOutputStream` does. In this case, we need to add
> similar wrapper class for zstd and modify lz4 the wrapper also. Add to
> this, it seems like we need to explicitly state that we follow the default
> compression level of the codec in the documentation. Is this what you
> intended?
>
> *3. Whether to allow the buffer/block size to be configurable*
>
> Well, As of current draft implementation, the lz4 level is implemented as
> block size; this is caused by my misunderstanding on lz4. After reviewing
> lz4 today, I found that it also supports compression level of 1~16
> (default: 1), not block size. I will fix it in this weekend by updating the
> wrapper class.
>
> For the problem of the buffer/block size, I have no strong opinion. If the
> community needs it, I will do it all together. How do you think?
>
> In short, it seems like I need to update the KIP document for issue #1 and
> update the compression wrapper for issue #2, #3. Is this okay?
>
> Thanks,
> Dongjin
>
> On Wed, Nov 28, 2018 at 12:34 AM Ismael Juma  wrote:
>
> >  Thanks for the KIP, this is helpful. A few questions:
> >
> > 1. Have we considered whether we want to allow a similar topic config?
> > 2. Can we rely on a method from the library to pick the default
> compression
> > level if compression.level is not set? We do it for gzip and it would
> seem
> > reasonable to do something similar for the other compression libraries.
> > 3. Do we want to allow the buffer/block size to be configurable? This has
> > an impact on memory usage and people may want to trade compression for
> > less/more memory in some cases. For example, the default for LZ4 is 64KB
> > which is a bit high.
> >
> > Ismael
> >
> > On Sun, Nov 18, 2018, 2:07 PM Dongjin Lee  >
> > > Hello dev,
> > >
> > > I hope to initiate the discussion of KIP-390: Add producer option to
> > adjust
> > > compression level
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Add+producer+option+to+adjust+compression+level
> > > >.
> > > All feedbacks will be highly appreciated.
> > >
> > > Best,
> > > Dongjin
> > >
> > > --
> > > *Dongjin Lee*
> > >
> > > *A hitchhiker in the mathematical world.*
> > >
> > > *github:  github.com/dongjinleekr
> > > linkedin:
> > kr.linkedin.com/in/dongjinleekr
> > > slideshare:
> > > www.slideshare.net/dongjinleekr
> > > *
> > >
> >
>
>
> --
> *Dongjin Lee*
>
> *A hitchhiker in the mathematical world.*
> *github:  github.com/dongjinleekr
> linkedin: kr.linkedin.com/in/dongjinleekr
> speakerdeck:
> speakerdeck.com/dongjin
> *
>


Re: [VOTE] KIP-351: Add --under-min-isr option to describe TopicCommand

2018-11-28 Thread Bill Bejeck
Thanks for the KIP
+1 (non-binding)

-Bill


On Wed, Nov 28, 2018 at 7:06 AM Manikumar  wrote:

> +1 (binding)
> Thanks for the KIP.
>
> Can you update the KIP stating that this option is not supported with
> "--zookeeper" option.
>
> On Mon, Nov 26, 2018 at 11:15 PM Mickael Maison 
> wrote:
>
> > +1 (non-binding)
> > Thanks for the KIP!
> > On Mon, Nov 26, 2018 at 4:32 PM Kevin Lu  wrote:
> > >
> > > Hi All,
> > >
> > > I'm bumping this thread as it has been a couple weeks with no activity.
> > >
> > > The proposed changes in this KIP are minor, but are extremely helpful
> for
> > > operators to immediately identify partitions under min ISR. Please
> take a
> > > couple minutes to review and provide a vote.
> > >
> > > Thanks~
> > >
> > > Regards,
> > > Kevin
> > >
> > > On Thu, Nov 8, 2018 at 1:07 AM Kevin Lu  wrote:
> > >
> > > > Hi All,
> > > >
> > > > I'm starting the vote thread for KIP-351: Add --under-min-isr option
> to
> > > > describe topics command.
> > > >
> > > > KIP:
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-351%3A+Add+--under-min-isr+option+to+describe+topics+command
> > > >
> > > > Discussion thread:
> > > >
> >
> https://lists.apache.org/thread.html/90d1652ebc03a7be4100dd101b92a7dcefe63d144856c5f6c132381b@%3Cdev.kafka.apache.org%3E
> > > >
> > > > Thanks!
> > > >
> > > > Regards,
> > > > Kevin
> > > >
> > > >
> >
>


Re: problems in Kafka unit testing trunk

2018-11-28 Thread lk gen
ulimit shows
$ ulimit -n
1024

is it too small for Kafka ?




On Wed, Nov 28, 2018 at 6:18 AM Dhruvil Shah  wrote:

> The unit test itself does not seem to use too many files. What is the
> output for `ulimit -n` on your system? Running `lsof` might also be helpful
> to determine how many open files you have while Kafka is not running.
>
> - Dhruvil
>
> On Tue, Nov 27, 2018 at 9:20 AM lk gen  wrote:
>
> > When running ./gradlew test
> > on a centos machine with gradle and java set
> > In the trunk version from today
> >
> > There are errors about too many files open of the form
> > "
> > kafka.admin.DeleteTopicTest > testDeletingPartiallyDeletedTopic FAILED
> > org.apache.kafka.common.KafkaException: java.io.IOException: Too many
> > open files
> > at
> > org.apache.kafka.common.network.Selector.(Selector.java:160)
> > at
> > org.apache.kafka.common.network.Selector.(Selector.java:212)
> > at
> > org.apache.kafka.common.network.Selector.(Selector.java:225)
> > at
> >
> >
> kafka.coordinator.transaction.TransactionMarkerChannelManager$.apply(TransactionMarkerChannelManager.scala:66)
> > at
> >
> >
> kafka.coordinator.transaction.TransactionCoordinator$.apply(TransactionCoordinator.scala:62)
> > at kafka.server.KafkaServer.startup(KafkaServer.scala:279)
> > at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
> > at
> >
> >
> kafka.admin.DeleteTopicTest.$anonfun$createTestTopicAndCluster$2(DeleteTopicTest.scala:372)
> > at
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > at scala.collection.Iterator.foreach(Iterator.scala:937)
> > at scala.collection.Iterator.foreach$(Iterator.scala:937)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> > at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> > at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > at
> scala.collection.TraversableLike.map(TraversableLike.scala:233)
> > at
> scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> > at
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > at
> >
> >
> kafka.admin.DeleteTopicTest.createTestTopicAndCluster(DeleteTopicTest.scala:372)
> > at
> >
> >
> kafka.admin.DeleteTopicTest.createTestTopicAndCluster(DeleteTopicTest.scala:366)
> > at
> >
> >
> kafka.admin.DeleteTopicTest.testDeletingPartiallyDeletedTopic(DeleteTopicTest.scala:418)
> >
> > Caused by:
> > java.io.IOException: Too many open files
> > at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method)
> > at
> > sun.nio.ch.EPollArrayWrapper.(EPollArrayWrapper.java:130)
> > at
> > sun.nio.ch.EPollSelectorImpl.(EPollSelectorImpl.java:69)
> > at
> > sun.nio.ch
> > .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
> > at java.nio.channels.Selector.open(Selector.java:227)
> > at
> > org.apache.kafka.common.network.Selector.(Selector.java:158)
> > ... 20 more
> >
> > "
> >
> > Is the environment I am using for gradle test is invalid ? are there
> > special settings required ?
> >
>


Re: [DISCUSS] KIP-390: Add producer option to adjust compression level

2018-11-28 Thread Dongjin Lee
Thank you Ismael, here are the answers:

*1. About topic config*

After some consideration, I concluded that topic config doesn't need to
support compression.level. Here is why: since the compression is conducted
by the client, the one who can select the best compression level is the
client itself. Let us assume that the compression level is set at the topic
config level. In that case, there is a possibility that the compression
level is not optimal for some producers. Actually, Kafka's go client also
supports compression level functionality for the producer config only.
 (wait, do we need
to add this reasoning in the KIP, rejected alternatives section?)

*2. About default level*

As of current draft implementation, the default compression is set on the
CompressionType enum. Of course, changing this strategy into relying on a
method from the library to pick the default compression level seems
possible, like `GZIPBlockOutputStream` does. In this case, we need to add
similar wrapper class for zstd and modify lz4 the wrapper also. Add to
this, it seems like we need to explicitly state that we follow the default
compression level of the codec in the documentation. Is this what you
intended?

*3. Whether to allow the buffer/block size to be configurable*

Well, As of current draft implementation, the lz4 level is implemented as
block size; this is caused by my misunderstanding on lz4. After reviewing
lz4 today, I found that it also supports compression level of 1~16
(default: 1), not block size. I will fix it in this weekend by updating the
wrapper class.

For the problem of the buffer/block size, I have no strong opinion. If the
community needs it, I will do it all together. How do you think?

In short, it seems like I need to update the KIP document for issue #1 and
update the compression wrapper for issue #2, #3. Is this okay?

Thanks,
Dongjin

On Wed, Nov 28, 2018 at 12:34 AM Ismael Juma  wrote:

>  Thanks for the KIP, this is helpful. A few questions:
>
> 1. Have we considered whether we want to allow a similar topic config?
> 2. Can we rely on a method from the library to pick the default compression
> level if compression.level is not set? We do it for gzip and it would seem
> reasonable to do something similar for the other compression libraries.
> 3. Do we want to allow the buffer/block size to be configurable? This has
> an impact on memory usage and people may want to trade compression for
> less/more memory in some cases. For example, the default for LZ4 is 64KB
> which is a bit high.
>
> Ismael
>
> On Sun, Nov 18, 2018, 2:07 PM Dongjin Lee 
> > Hello dev,
> >
> > I hope to initiate the discussion of KIP-390: Add producer option to
> adjust
> > compression level
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Add+producer+option+to+adjust+compression+level
> > >.
> > All feedbacks will be highly appreciated.
> >
> > Best,
> > Dongjin
> >
> > --
> > *Dongjin Lee*
> >
> > *A hitchhiker in the mathematical world.*
> >
> > *github:  github.com/dongjinleekr
> > linkedin:
> kr.linkedin.com/in/dongjinleekr
> > slideshare:
> > www.slideshare.net/dongjinleekr
> > *
> >
>


-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*
*github:  github.com/dongjinleekr
linkedin: kr.linkedin.com/in/dongjinleekr
speakerdeck: speakerdeck.com/dongjin
*


Re: [DISCUSS] KIP-158: Kafka Connect should allow source connectors to set topic-specific settings for new topics

2018-11-28 Thread Randall Hauch
On Tue, Nov 27, 2018 at 6:31 PM Randall Hauch  wrote:

> Thanks for the feedback. Some thoughts inline.
>
> On Tue, Nov 27, 2018 at 5:47 PM Ewen Cheslack-Postava 
> wrote:
>
>> re: AdminClient vs this proposal, one consideration is that AdminClient
>> exposes a lot more surface area and probably a bunch of stuff we actually
>> don't want Connectors to be able to do, such as deleting topics. You can
>> always lock down by ACLs, but what the framework enables directly vs
>> requiring the user to opt in via connector-specific config is an important
>> distinction.
>>
>> I'm not a fan of how complex the config is (same deal with
>> transformations), and agree with Ryanne that any case requiring multiple
>> rules is probably an outlier. A cleaner option for the common case might
>> be
>> worth it. One option that's still aligned with the current state of the
>> KIP
>> would be to change the default for topic.creation to a fixed default value
>> (e.g. 'default'), if that's the case turn the topic.creation.default.regex
>> default to .*, and then 99% use case would just be specifying the # of
>> partitions with a single config and relying on cluster defaults for the
>> rest. (I would suggest the same thing for transformations if we added a
>> simple scripting transformation such that most use cases wouldn't need to
>> compose multiple transformations.)
>>
>
> I agree that any case requiring multiple rules is probably an outlier, and
> I've been trying to think about how to start simple with a single case but
> leave room if we really do need multiple rules in the future. I like Ewen's
> suggestion a lot. IIUC, it would change the following common case:
>
> topic.creation=default
> topic.creation.default.regex=.*
> topic.creation.default.partitions=5
>
> into the following:
>
> topic.creation.default.partitions=5
>
> where the replication defaults to 3 and all others default to the brokers'
> default topic settings. This is significantly simpler, yet it still allows
> us to handle multiple rules if they're needed.
>
> Another common case is to use compacted topics, so that might required
> adding:
>
> topic.creation.default.cleanup.policy=compact
>
> Also, I like the idea of defaulting the regex to '.*', but I wonder if
> it'd be easier to explain if that default value applied to the *last* rule
> in the list of rules, rather than to apply only to the rule named "default"
> when that's the only named rule. WDYT?
>

Ewen, one disadvantage of this approach is that topic creation in Connect
is enabled by default, rather than being opt-in.



>
>
>>
>> Along related lines, is there actually a need for TopicSettings class? We
>> already have NewTopic in the AdminClient APIs. Does that not suffice?
>>
>
> There are three reasons I added TopicSettings and didn't simply use
> NewTopic. First, NewTopic is immutable, which makes it impractical for
> Connect to pass to a connector and to allow the connector to change it.
> Second, TopicSettings is essentially a builder with easy to use and
> chainable methods, whereas NewTopic relies upon Map, String
> constants (in another class), and essentially untyped values. Third,
> NewTopic is not an interface, and I think it's better to expose an
> interface in the connector API when we don't want/expect connectors to
> instantiate the instance and to instead only use what we provide them.
>
> Now, another option is to move TopicSettings into the
> `org.apache.kafka.clients.admin` package and turn it into `NewTopicBuilder`
> instead. Then it'd be useful outside of Connect, but is it strange that
> it's not in the Connect API packages?
>
> Randall
>
>
>>
>> -Ewen
>>
>> On Mon, Sep 24, 2018 at 11:56 AM Andrew Otto  wrote:
>>
>> > FWIW, I’d find this feature useful.
>> >
>> > On Mon, Sep 24, 2018 at 2:42 PM Randall Hauch  wrote:
>> >
>> > > Ryanne,
>> > >
>> > > If your connector is already using the AdminClient, then you as the
>> > > developer have a choice of switching to the new Connect-based
>> > functionality
>> > > or keeping the existing use of the AdminClient. If the connector uses
>> > both
>> > > mechanisms (which I wouldn't recommend, simply because of the
>> complexity
>> > of
>> > > it for a user), then the topic will be created by the first mechanism
>> to
>> > > actually attempt and successfully create the topic(s) in the Kafka
>> > cluster
>> > > that the Connect worker uses. As mentioned in the KIP, "This feature
>> ...
>> > > does not change the topic-specific settings on any existing topics."
>> IOW,
>> > > if the topic already exists, it can't be created again and therefore
>> the
>> > > `topic.creation.*` properties will not apply for that existing topic.
>> > >
>> > > > Do these settings apply to internal topics created by the framework
>> on
>> > > > bahalf of a connector, e.g. via KafkaConfigBackingStore?
>> > >
>> > > No, they don't, and I'm happy to add a clarification to the KIP if you
>> > feel
>> > > it is necessary.
>> > >
>> > > > I'd have the same 

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-28 Thread Boyang Chen
Thanks Guozhang and Mayuresh for the follow up! Answers are listed below.


>  5. Regarding "So in summary, *the member will only be removed due to
> session timeout*. We shall remove it from both in-memory static member name
> mapping and member list." If the rebalance is invoked manually using the
> the admin apis, how long should the group coordinator wait for the members
> of the group to send a JoinGroupRequest for participating in the rebalance?
> How is a lagging consumer handled?

Great question. Let's use c1~c4 example here:

  1.  Consumer c1, c2, c3, c4 in stable state
  2.  c4 goes down and we detect this issue before session timeout through 
client monitoring. Initiate a ConsumerRebalanceRequest.
  3.  A rebalance will be kicking off, and after rebalance timeout we shall 
keep the same assignment for c1~4, if the session timeout for c4 hasn't reached
  4.  Group back to stable with c1~4 (although c4 is actually offline)
  5.  c4 session timeout finally reached: another rebalance triggered.

For step 3, if session timeout triggered within rebalance timeout, only c1~3 
will be participating in the rebalance. This is what we mean by saying 
"rebalance
timeout shall not remove current members, only session timeout will do."
As you could see this is not an ideal scenario: we trigger extra rebalance at 
step 5. In my reply to Guozhang I'm asking whether we should still use 
LeaveGroupRequest for static members to send a signal to broker saying "I'm 
currently offline", and when we send ConsumerRebalanceRequest to broker, we 
will actually kick off c4 because it says it's offline already, saving one or 
multiple additional rebalances later. This way the ConsumerRebalanceRequest 
will be more effective in making correct judgement on the group status since we 
have more feedback from client side.

> - When we say that we would use invokeConsumerRebalance(groupId) to down
> scale, with the example in the above question, how will the
> GroupCoordinator know that c4 should be kicked out of the group since we
> are trying to invoke rebalance proactively without waiting for c4's session
> time out to expire. Should there be a way of telling the GroupCoordinator
> that consumer c4 has been kicked out of the groupId = "GroupA"?
Previous proposal should be suffice to answer this question 

- Also it looks like the statement "If the `member.id` uses
> UNKNOWN_MEMBER_NAME, we shall always generate a new member id and replace
> the one within current map, if `group.member.name` is known. Also once we
> are done with KIP-394
> <
> https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequestdata=02%7C01%7C%7Ccd1e9e2eee0f440987bf08d654aa1dd9%7C84df9e7fe9f640afb435%7C1%7C0%7C636789486977454595sdata=lEv9SuBZgATGhOSe5zUr%2Fqudycoh%2FwfdM%2FhPH5Hp1N4%3Dreserved=0
> >,
> all the join group requests are requiring `member.id` to physically enter
> the consumer group. This way the latest joined " is incomplete. Can you
> take a look at this?
> Also when we say "all the join group requests are requiring `member.id` to
> physically enter the consumer group." because a newly started consumer will
> not have a "member.id", I assume you mean, once the GroupCoordinator
> assigns a member.id to the newly started consumer, it has to use it for
> any
> future JoinGroupRequests. Is my understanding correct?
>
Thanks for catching it! And yes, we shall use one extra round-trip between 
consumer
and broker to inform the new member id allocation.

Next is the replies to Guozhang's comment:
2) I once have a discussion about the LeaveGroupRequest for static members,
and the reason for not having it for static members is that we'd need to
make it a configurable behavior as well (i.e. the likelihood that a static
member may shutdown but come back later may be even larger than the
likelihood that a shutdown static member would not come back), and when a
shutdown is complete the instance cannot tell whether or not it will come
back by itself. And hence letting a third party (think: admin used by K8s
plugins) issuing a request to indicate static member changes would be more
plausible.

I think having an optional list of all the static members that are still in
the group, rather than the members to be removed since the latter looks a
bit less flexible to me, in the request is a good idea (remember we allow a
group to have both static and dynamic members at the same time, so when
receiving the request, we will only do the diff and add / remove the static
members directly only, while still let the dynamic members to try to
re-join the group with the rebalance timeout).
I'm also in favor of storing all the in-group static members. In fact we could 
reuse
the static membership mapping to store this information. Do you think
that we should let static member send leave group request to indicate their 
status 

[jira] [Created] (KAFKA-7685) Support loading trust stores from classpath

2018-11-28 Thread Noa Resare (JIRA)
Noa Resare created KAFKA-7685:
-

 Summary: Support loading trust stores from classpath
 Key: KAFKA-7685
 URL: https://issues.apache.org/jira/browse/KAFKA-7685
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.1.0
Reporter: Noa Resare


Certificate pinning as well as authenticating kafka brokers using a non-public 
CA certificate maintained inside an organisation is desirable to a lot of 
users. This can be accomplished today using the {{ssl.truststore.location}} 
configuration property. Unfortunately, this value is always interpreted as a 
filesystem path which makes distribution of such an alternative truststore a 
needlessly cumbersome process. If we had the ability to load a trust store from 
the classpath as well as from a file, the trust store could be shipped in a jar 
that could be declared as a regular maven style dependency.

If we did this by supporting prefixing {{ssl.truststore.location}} with 
{{classpath:}} this could be a backwards compatible change, one that builds on 
prior design patterns established by for example the Spring project.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-351: Add --under-min-isr option to describe TopicCommand

2018-11-28 Thread Manikumar
+1 (binding)
Thanks for the KIP.

Can you update the KIP stating that this option is not supported with
"--zookeeper" option.

On Mon, Nov 26, 2018 at 11:15 PM Mickael Maison 
wrote:

> +1 (non-binding)
> Thanks for the KIP!
> On Mon, Nov 26, 2018 at 4:32 PM Kevin Lu  wrote:
> >
> > Hi All,
> >
> > I'm bumping this thread as it has been a couple weeks with no activity.
> >
> > The proposed changes in this KIP are minor, but are extremely helpful for
> > operators to immediately identify partitions under min ISR. Please take a
> > couple minutes to review and provide a vote.
> >
> > Thanks~
> >
> > Regards,
> > Kevin
> >
> > On Thu, Nov 8, 2018 at 1:07 AM Kevin Lu  wrote:
> >
> > > Hi All,
> > >
> > > I'm starting the vote thread for KIP-351: Add --under-min-isr option to
> > > describe topics command.
> > >
> > > KIP:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-351%3A+Add+--under-min-isr+option+to+describe+topics+command
> > >
> > > Discussion thread:
> > >
> https://lists.apache.org/thread.html/90d1652ebc03a7be4100dd101b92a7dcefe63d144856c5f6c132381b@%3Cdev.kafka.apache.org%3E
> > >
> > > Thanks!
> > >
> > > Regards,
> > > Kevin
> > >
> > >
>


Re: [DISCUSS] KIP-394: Require member.id for initial join group request

2018-11-28 Thread Stanislav Kozlovski
Hey Matthias,

I think the notion is to have the `session.timeout.ms` to start ticking
when the broker responds with the member.id. Then, the broker would
properly expire consumers and not hold too many stale ones.
This isn't mentioned in the KIP though so it is worth to wait for Boyang to
confirm

On Wed, Nov 28, 2018 at 3:10 AM Matthias J. Sax 
wrote:

> Thanks for the KIP Boyang.
>
> I guess I am missing something, but I am still learning more details
> about the rebalance protocol, so maybe you can help me out?
>
> Assume a client sends UNKNOWN_MEMBER_ID in its first joinGroup request.
> The broker generates a `member.id` and sends it back via
> `MEMBER_ID_REQUIRED` error response. This response might never reach the
> client or the client fails before it can send the second joinGroup
> request. Thus, a client would need to start over with a new
> UNKNOWN_MEMBER_ID in its joinGroup request. Thus, the broker needs to
> generate a new `member.id` again.
>
> So it seems the problem is moved, but not resolved? The motivation of
> the KIP is:
>
> > The edge case is that if initial join group request keeps failing due to
> connection timeout, or the consumer keeps restarting,
>
> From my understanding, this KIP move the issue from the first to the
> second joinGroup request (or broker joinGroup response).
>
> But maybe I am missing something. Can you help me out?
>
>
> -Matthias
>
>
> On 11/27/18 6:00 PM, Boyang Chen wrote:
> > Thanks Stanislav and Jason for the suggestions!
> >
> >
> >> Thanks for the KIP. Looks good overall. I think we will need to bump the
> >> version of the JoinGroup protocol in order to indicate compatibility
> with
> >> the new behavior. The coordinator needs to know when it is safe to
> assume
> >> the client will handle the error code.
> >>
> >> Also, I was wondering if we could reuse the REBALANCE_IN_PROGRESS error
> >> code. When the client sees this error code, it will take the memberId
> from
> >> the response and rejoin. We'd still need the protocol bump since older
> >> consumers do not have this logic.
> >
> > I will add the join group protocol version change to the KIP. Meanwhile
> I feel for
> > understandability it's better to define a separate error code since
> REBALANCE_IN_PROGRESS
> > is not the actual cause of the returned error.
> >
> >> One small question I have is now that we have one and a half round-trips
> >> needed to join in a rebalance (1 full RT addition), is it worth it to
> >> consider increasing the default value of `
> group.initial.rebalance.delay.ms`?
> > I guess we could keep it for now. After KIP-345 and incremental
> cooperative rebalancing
> > work we should be safe to deprecate `group.initial.rebalance.delay.ms`.
> Also one round trip
> > shouldn't increase the latency too much IMO.
> >
> > Best,
> > Boyang
> > 
> > From: Stanislav Kozlovski 
> > Sent: Wednesday, November 28, 2018 2:32 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-394: Require member.id for initial join
> group request
> >
> > Hi Boyang,
> >
> > The KIP looks very good.
> > One small question I have is now that we have one and a half round-trips
> > needed to join in a rebalance (1 full RT addition), is it worth it to
> > consider increasing the default value of `
> group.initial.rebalance.delay.ms`?
> >
> > Best,
> > Stanislav
> >
> > On Tue, Nov 27, 2018 at 5:39 PM Jason Gustafson 
> wrote:
> >
> >> Hi Boyang,
> >>
> >> Thanks for the KIP. Looks good overall. I think we will need to bump the
> >> version of the JoinGroup protocol in order to indicate compatibility
> with
> >> the new behavior. The coordinator needs to know when it is safe to
> assume
> >> the client will handle the error code.
> >>
> >> Also, I was wondering if we could reuse the REBALANCE_IN_PROGRESS error
> >> code. When the client sees this error code, it will take the memberId
> from
> >> the response and rejoin. We'd still need the protocol bump since older
> >> consumers do not have this logic.
> >>
> >> Thanks,
> >> Jason
> >>
> >> On Mon, Nov 26, 2018 at 5:47 PM Boyang Chen 
> wrote:
> >>
> >>> Hey friends,
> >>>
> >>>
> >>> I would like to start a discussion thread for KIP-394 which is trying
> to
> >>> mitigate broker cache bursting issue due to anonymous join group
> >> requests:
> >>>
> >>>
> >>>
> >>
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequestdata=02%7C01%7C%7C8c2c54e07967404f0fa808d65496c9c7%7C84df9e7fe9f640afb435%7C1%7C0%7C636789403931186848sdata=oRbPKzwyDx6SodAaVb3Vv%2FXpJoD09E3%2BdTc0p1qKDEo%3Dreserved=0
> >>>
> >>>
> >>> Thanks!
> >>>
> >>> Boyang
> >>>
> >>
> >
> >
> > --
> > Best,
> > Stanislav
> >
>
>

-- 
Best,
Stanislav


Jenkins build is back to normal : kafka-trunk-jdk11 #120

2018-11-28 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-2.1-jdk8 #63

2018-11-28 Thread Apache Jenkins Server
See 


Changes:

[me] KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer

--
[...truncated 2.63 MB...]

org.apache.kafka.streams.TopologyTest > 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores STARTED

org.apache.kafka.streams.TopologyTest > 
shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores PASSED

org.apache.kafka.streams.TopologyTest > 
kTableAnonymousMaterializedMapValuesShouldPreserveTopologyStructure STARTED

org.apache.kafka.streams.TopologyTest > 
kTableAnonymousMaterializedMapValuesShouldPreserveTopologyStructure PASSED

org.apache.kafka.streams.TopologyTest > 
sessionWindowZeroArgCountShouldPreserveTopologyStructure STARTED

org.apache.kafka.streams.TopologyTest > 
sessionWindowZeroArgCountShouldPreserveTopologyStructure PASSED

org.apache.kafka.streams.TopologyTest > 
tableAnonymousMaterializedCountShouldPreserveTopologyStructure STARTED

org.apache.kafka.streams.TopologyTest > 
tableAnonymousMaterializedCountShouldPreserveTopologyStructure PASSED

org.apache.kafka.streams.TopologyTest > shouldDescribeGlobalStoreTopology 
STARTED

org.apache.kafka.streams.TopologyTest > shouldDescribeGlobalStoreTopology PASSED

org.apache.kafka.streams.TopologyTest > 
kTableNonMaterializedMapValuesShouldPreserveTopologyStructure STARTED

org.apache.kafka.streams.TopologyTest > 
kTableNonMaterializedMapValuesShouldPreserveTopologyStructure PASSED

org.apache.kafka.streams.TopologyTest > 
kGroupedStreamAnonymousMaterializedCountShouldPreserveTopologyStructure STARTED

org.apache.kafka.streams.TopologyTest > 
kGroupedStreamAnonymousMaterializedCountShouldPreserveTopologyStructure PASSED

org.apache.kafka.streams.TopologyTest > 
shouldDescribeMultipleGlobalStoreTopology STARTED

org.apache.kafka.streams.TopologyTest > 
shouldDescribeMultipleGlobalStoreTopology PASSED

org.apache.kafka.streams.TopologyTest > shouldNotAllowToAddSinkWithSameName 
STARTED

org.apache.kafka.streams.TopologyTest > shouldNotAllowToAddSinkWithSameName 
PASSED

org.apache.kafka.streams.TopologyTest > 
sinkShouldReturnNullTopicWithDynamicRouting STARTED

org.apache.kafka.streams.TopologyTest > 
sinkShouldReturnNullTopicWithDynamicRouting PASSED

org.apache.kafka.streams.TopologyTest > shouldNotAllowToAddSinkWithNullParents 
STARTED

org.apache.kafka.streams.TopologyTest > shouldNotAllowToAddSinkWithNullParents 
PASSED

org.apache.kafka.streams.TopologyTest > 
shouldNotAllowNullProcessorSupplierWhenAddingProcessor STARTED

org.apache.kafka.streams.TopologyTest > 
shouldNotAllowNullProcessorSupplierWhenAddingProcessor PASSED

org.apache.kafka.streams.TopologyTest > 
timeWindowAnonymousMaterializedCountShouldPreserveTopologyStructure STARTED

org.apache.kafka.streams.TopologyTest > 
timeWindowAnonymousMaterializedCountShouldPreserveTopologyStructure PASSED

org.apache.kafka.streams.TopologyTest > shouldFailIfSinkIsParent STARTED

org.apache.kafka.streams.TopologyTest > shouldFailIfSinkIsParent PASSED

org.apache.kafka.streams.TopologyTest > shouldNotAllowToAddSourcesWithSameName 
STARTED

org.apache.kafka.streams.TopologyTest > shouldNotAllowToAddSourcesWithSameName 
PASSED

org.apache.kafka.streams.TopologyTest > 
sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology STARTED

org.apache.kafka.streams.TopologyTest > 
sourceAndProcessorWithMultipleStatesShouldHaveSingleSubtopology PASSED

org.apache.kafka.streams.TopologyTest > shouldDescribeEmptyTopology STARTED

org.apache.kafka.streams.TopologyTest > shouldDescribeEmptyTopology PASSED

org.apache.kafka.streams.TopologyTest > shouldFailIfSinkIsItsOwnParent STARTED

org.apache.kafka.streams.TopologyTest > shouldFailIfSinkIsItsOwnParent PASSED

org.apache.kafka.streams.TopologyTest > 
testPatternMatchesAlreadyProvidedTopicSource STARTED

org.apache.kafka.streams.TopologyTest > 
testPatternMatchesAlreadyProvidedTopicSource PASSED

org.apache.kafka.streams.TopologyTest > 
singleSourcePatternShouldHaveSingleSubtopology STARTED

org.apache.kafka.streams.TopologyTest > 
singleSourcePatternShouldHaveSingleSubtopology PASSED

org.apache.kafka.streams.TopologyTest > 
kTableNonMaterializedFilterShouldPreserveTopologyStructure STARTED

org.apache.kafka.streams.TopologyTest > 
kTableNonMaterializedFilterShouldPreserveTopologyStructure PASSED

org.apache.kafka.streams.TopologyTest > 
topologyWithDynamicRoutingShouldDescribeExtractorClass STARTED

org.apache.kafka.streams.TopologyTest > 
topologyWithDynamicRoutingShouldDescribeExtractorClass PASSED

org.apache.kafka.streams.TopologyTest > singleSourceShouldHaveSingleSubtopology 
STARTED

org.apache.kafka.streams.TopologyTest > singleSourceShouldHaveSingleSubtopology 
PASSED

org.apache.kafka.streams.TopologyTest > 
shouldNotAllowNullTopicChooserWhenAddingSink STARTED

org.apache.kafka.streams.TopologyTest > 
shouldNotAllowNullTopicChooserWhenAddingSink PASSED


Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata growth

2018-11-28 Thread Stanislav Kozlovski
Hi Jason,

You raise some very valid points.

> The benefit of this KIP is probably limited to preventing "runaway"
consumer groups due to leaks or some other application bug
What do you think about the use case I mentioned in my previous reply about
a more resilient self-service Kafka? I believe the benefit there is bigger

* Default value
You're right, we probably do need to be conservative. Big consumer groups
are considered an anti-pattern and my goal was to also hint at this through
the config's default. Regardless, it is better to not have the potential to
break applications with an upgrade.
Choosing between the default of something big like 5000 or an opt-in
option, I think we should go with the *disabled default option*  (-1).
The only benefit we would get from a big default of 5000 is default
protection against buggy/malicious applications that hit the KAFKA-7610
issue.
While this KIP was spawned from that issue, I believe its value is enabling
the possibility of protection and helping move towards a more self-service
Kafka. I also think that a default value of 5000 might be misleading to
users and lead them to think that big consumer groups (> 250) are a good
thing.

The good news is that KAFKA-7610 should be fully resolved and the rebalance
protocol should, in general, be more solid after the planned improvements
in KIP-345 and KIP-394.

* Handling bigger groups during upgrade
I now see that we store the state of consumer groups in the log and why a
rebalance isn't expected during a rolling upgrade.
Since we're going with the default value of the max.size being disabled, I
believe we can afford to be more strict here.
During state reloading of a new Coordinator with a defined max.group.size
config, I believe we should *force* rebalances for groups that exceed the
configured size. Then, only some consumers will be able to join and the max
size invariant will be satisfied.

I updated the KIP with a migration plan, rejected alternatives and the new
default value.

Thanks,
Stanislav

On Tue, Nov 27, 2018 at 5:25 PM Jason Gustafson  wrote:

> Hey Stanislav,
>
> Clients will then find that coordinator
> > and send `joinGroup` on it, effectively rebuilding the group, since the
> > cache of active consumers is not stored outside the Coordinator's memory.
> > (please do say if that is incorrect)
>
>
> Groups do not typically rebalance after a coordinator change. You could
> potentially force a rebalance if the group is too big and kick out the
> slowest members or something. A more graceful solution is probably to just
> accept the current size and prevent it from getting bigger. We could log a
> warning potentially.
>
> My thinking is that we should abstract away from conserving resources and
> > focus on giving control to the broker. The issue that spawned this KIP
> was
> > a memory problem but I feel this change is useful in a more general way.
>
>
> So you probably already know why I'm asking about this. For consumer groups
> anyway, resource usage would typically be proportional to the number of
> partitions that a group is reading from and not the number of members. For
> example, consider the memory use in the offsets cache. The benefit of this
> KIP is probably limited to preventing "runaway" consumer groups due to
> leaks or some other application bug. That still seems useful though.
>
> I completely agree with this and I *ask everybody to chime in with opinions
> > on a sensible default value*.
>
>
> I think we would have to be very conservative. The group protocol is
> generic in some sense, so there may be use cases we don't know of where
> larger groups are reasonable. Probably we should make this an opt-in
> feature so that we do not risk breaking anyone's application after an
> upgrade. Either that, or use a very high default like 5,000.
>
> Thanks,
> Jason
>
> On Tue, Nov 27, 2018 at 3:27 AM Stanislav Kozlovski <
> stanis...@confluent.io>
> wrote:
>
> > Hey Jason and Boyang, those were important comments
> >
> > > One suggestion I have is that it would be helpful to put your reasoning
> > on deciding the current default value. For example, in certain use cases
> at
> > Pinterest we are very likely to have more consumers than 250 when we
> > configure 8 stream instances with 32 threads.
> > > For the effectiveness of this KIP, we should encourage people to
> discuss
> > their opinions on the default setting and ideally reach a consensus.
> >
> > I completely agree with this and I *ask everybody to chime in with
> opinions
> > on a sensible default value*.
> > My thought process was that in the current model rebalances in large
> groups
> > are more costly. I imagine most use cases in most Kafka users do not
> > require more than 250 consumers.
> > Boyang, you say that you are "likely to have... when we..." - do you have
> > systems running with so many consumers in a group or are you planning
> to? I
> > guess what I'm asking is whether this has been tested in production with
> > the 

[jira] [Resolved] (KAFKA-6149) LogCleanerManager should include topic partition name when warning of invalid cleaner offset

2018-11-28 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-6149.
--
   Resolution: Fixed
Fix Version/s: 0.11.0.1
   1.0.0

This was fixed in 1.0 and 0.11.0.1+ releases

> LogCleanerManager should include topic partition name when warning of invalid 
> cleaner offset 
> -
>
> Key: KAFKA-6149
> URL: https://issues.apache.org/jira/browse/KAFKA-6149
> Project: Kafka
>  Issue Type: Improvement
>  Components: log, logging
>Reporter: Ryan P
>Priority: Major
> Fix For: 1.0.0, 0.11.0.1
>
>
> The following message would be a lot more helpful if the topic partition name 
> were included.
> if (!isCompactAndDelete(log))
>   warn(s"Resetting first dirty offset to log start offset 
> $logStartOffset since the checkpointed offset $offset is invalid.")



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-trunk-jdk8 #3222

2018-11-28 Thread Apache Jenkins Server
See 




Build failed in Jenkins: kafka-trunk-jdk11 #119

2018-11-28 Thread Apache Jenkins Server
See 


Changes:

[me] KAFKA-7620: Fix restart logic for TTLs in WorkerConfigTransformer

--
[...truncated 2.26 MB...]
org.apache.kafka.trogdor.common.WorkerUtilsTest > 
testCommonConfigOverwritesDefaultProps STARTED

org.apache.kafka.trogdor.common.WorkerUtilsTest > 
testCommonConfigOverwritesDefaultProps PASSED

org.apache.kafka.trogdor.common.WorkerUtilsTest > 
testClientConfigOverwritesBothDefaultAndCommonConfigs STARTED

org.apache.kafka.trogdor.common.WorkerUtilsTest > 
testClientConfigOverwritesBothDefaultAndCommonConfigs PASSED

org.apache.kafka.trogdor.common.WorkerUtilsTest > testExistingTopicsNotCreated 
STARTED

org.apache.kafka.trogdor.common.WorkerUtilsTest > testExistingTopicsNotCreated 
PASSED

org.apache.kafka.trogdor.common.WorkerUtilsTest > 
testGetMatchingTopicPartitionsCorrectlyMatchesExactTopicName STARTED

org.apache.kafka.trogdor.common.WorkerUtilsTest > 
testGetMatchingTopicPartitionsCorrectlyMatchesExactTopicName PASSED

org.apache.kafka.trogdor.common.WorkerUtilsTest > testCreateRetriesOnTimeout 
STARTED

org.apache.kafka.trogdor.common.WorkerUtilsTest > testCreateRetriesOnTimeout 
PASSED

org.apache.kafka.trogdor.common.WorkerUtilsTest > 
testExistingTopicsMustHaveRequestedNumberOfPartitions STARTED

org.apache.kafka.trogdor.common.WorkerUtilsTest > 
testExistingTopicsMustHaveRequestedNumberOfPartitions PASSED

org.apache.kafka.trogdor.common.WorkerUtilsTest > 
testAddConfigsToPropertiesAddsAllConfigs STARTED

org.apache.kafka.trogdor.common.WorkerUtilsTest > 
testAddConfigsToPropertiesAddsAllConfigs PASSED

org.apache.kafka.trogdor.common.WorkerUtilsTest > 
testGetMatchingTopicPartitionsCorrectlyMatchesTopics STARTED

org.apache.kafka.trogdor.common.WorkerUtilsTest > 
testGetMatchingTopicPartitionsCorrectlyMatchesTopics PASSED

org.apache.kafka.trogdor.common.WorkerUtilsTest > testCreateOneTopic STARTED

org.apache.kafka.trogdor.common.WorkerUtilsTest > testCreateOneTopic PASSED

org.apache.kafka.trogdor.common.StringExpanderTest > testExpansions STARTED

org.apache.kafka.trogdor.common.StringExpanderTest > testExpansions PASSED

org.apache.kafka.trogdor.common.StringExpanderTest > testNoExpansionNeeded 
STARTED

org.apache.kafka.trogdor.common.StringExpanderTest > testNoExpansionNeeded 
PASSED

org.apache.kafka.trogdor.common.JsonSerializationTest > 
testDeserializationDoesNotProduceNulls STARTED

org.apache.kafka.trogdor.common.JsonSerializationTest > 
testDeserializationDoesNotProduceNulls PASSED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToResponseInvalidRequestException STARTED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToResponseInvalidRequestException PASSED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToResponseJsonMappingException STARTED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToResponseJsonMappingException PASSED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > testToResponseNotFound 
STARTED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > testToResponseNotFound 
PASSED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToExceptionClassNotFoundException STARTED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToExceptionClassNotFoundException PASSED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToResponseInvalidTypeIdException STARTED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToResponseInvalidTypeIdException PASSED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToExceptionNotFoundException STARTED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToExceptionNotFoundException PASSED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToResponseUnknownException STARTED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToResponseUnknownException PASSED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToExceptionSerializationException STARTED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToExceptionSerializationException PASSED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToResponseClassNotFoundException STARTED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToResponseClassNotFoundException PASSED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToResponseSerializationException STARTED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToResponseSerializationException PASSED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToExceptionRuntimeException STARTED

org.apache.kafka.trogdor.rest.RestExceptionMapperTest > 
testToExceptionRuntimeException PASSED

org.apache.kafka.trogdor.workload.ConsumeBenchSpecTest > 
testMaterializeTopicsWithSomePartitions STARTED

org.apache.kafka.trogdor.workload.ConsumeBenchSpecTest > 
testMaterializeTopicsWithSomePartitions