Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata growth

2019-01-09 Thread Stanislav Kozlovski
Sounds good! I've updated the KIP with another small section under
Motivation.
If there aren't any objections or further recommendations, I plan on
starting a VOTE thread in the following days.

Best,
Stanislav

On Wed, Jan 9, 2019 at 8:54 PM Gwen Shapira  wrote:

> Thanks for the data driven approach, Stanislav. I love it :)
> And thank you for sharing your formula, Boyang. I totally agree that
> rebalance latency will not grow linearly with the consumer group size.
>
> My recommendation, considering what we know today:
> 1. Add the limit config, and set it to MAX_INT by default (effectively
> unlimited, without a magic number like -1)
> 2. Document our thoughts - the concern about runaway groups,
> Pinterest's 500 limit, Confluent's experience with few thousand
> consumers in a group, the conclusions from Stanislav's memory research
> (Personally, I wouldn't want what is essentially a linked list that we
> iterate to grow beyond 1M).
>
> Mostly likely, 99% of the users won't need it and those who do will
> have the right information to figure things out (or at least, they'll
> know everything that we know).
>
> WDYT?
>
> On Wed, Jan 9, 2019 at 4:25 AM Stanislav Kozlovski
>  wrote:
> >
> > Hey everybody,
> >
> > I ran a quick benchmark and took some heap dumps to gauge how much memory
> > each consumer in a group is using, all done locally.
> > The setup was the following: 10 topics with 10 partitions each (100
> > partitions total) and one consumer group with 10 members, then expanded
> to
> > 20 members.
> > Here are some notes of my findings in a public Google doc:
> >
> https://docs.google.com/document/d/1Z4aY5qg8lU2uNXzdgp_30_oJ9_I9xNelPko6GIQYXYk/edit?usp=sharing
> >
> >
> > On Mon, Jan 7, 2019 at 10:51 PM Boyang Chen  wrote:
> >
> > > Hey Stanislav,
> > >
> > > I think the time taken to rebalance is not linearly correlated with
> number
> > > of consumers with our application. As for our current and future use
> cases,
> > > the main concern for Pinterest is still on the broker memory not CPU,
> > > because crashing server by one application could have cascading effect
> on
> > > all jobs.
> > > Do you want to drive a more detailed formula on how to compute the
> memory
> > > consumption against number of consumers within the group?
> > >
> > > In the meantime, I'm pretty buying in the motivation of this KIP, so I
> > > think the follow-up work is just refinement to make the new config
> easy to
> > > use. We should be good
> > > to vote IMO.
> > >
> > > Best,
> > > Boyang
> > > 
> > > From: Stanislav Kozlovski 
> > > Sent: Monday, January 7, 2019 4:21 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> > > metadata growth
> > >
> > > Hey there,
> > >
> > > Per Gwen's comments, I slightly reworked the motivation section. Let me
> > > know if it's any better now
> > >
> > > I completely agree that it would be best if we were to add a
> recommended
> > > number to a typical consumer group size. There is a problem that
> timing the
> > > CPU usage and rebalance times of consumer groups is tricky. We can
> update
> > > the KIP with memory guidelines (e.g 1 consumer in a group uses X
> memory,
> > > therefore 100 use Y).
> > > I fear that the most useful recommendations though would be knowing
> the CPU
> > > impact of large consumer groups and the rebalance times. That is,
> > > unfortunately, tricky to test and measure.
> > >
> > > @Boyang, you had mentioned some numbers used in Pinterest. If
> available to
> > > you, would you be comfortable sharing the number of consumers you are
> using
> > > in a group and maybe the potential time it takes to rebalance it?
> > >
> > > I'd appreciate any anecdotes regarding consumer group sizes from the
> > > community
> > >
> > > Best,
> > > Stanislav
> > >
> > > On Thu, Jan 3, 2019 at 1:59 AM Boyang Chen 
> wrote:
> > >
> > > > Thanks Gwen for the suggestion! +1 on the guidance of defining
> > > > group.max.size. I guess a sample formula would be:
> > > > 2 * (# of brokers * average metadata cache size * 80%) / (# of
> consumer
> > > > groups * size of a single member metadata)
> > > >
> > > > if we assumed non-skewed partition assignment and pretty fair
> consumer
> > > > group consumption. The "2" is the 95 percentile of normal
> distribution
> > > and
> > > > 80% is just to buffer some memory capacity which are both open to
> > > > discussion. This config should be useful for Kafka platform team to
> make
> > > > sure one extreme large consumer group won't bring down the whole
> cluster.
> > > >
> > > > What do you think?
> > > >
> > > > Best,
> > > > Boyang
> > > >
> > > > 
> > > > From: Gwen Shapira 
> > > > Sent: Thursday, January 3, 2019 2:59 AM
> > > > To: dev
> > > > Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> > > > metadata growth
> > > >
> > > > Sorry for joining the fun late, but I think the problem we a

Re: [DISCUSS] KIP-362: Dynamic Session Window Support

2019-01-09 Thread Guozhang Wang
Hello Lei,

Just checking what's the current status of this KIP. We have a KIP deadline
for 2.2 on 24th and wondering if this one may be able to make it.


Guozhang

On Sat, Dec 15, 2018 at 1:01 PM Lei Chen  wrote:

> Sorry for the late reply Matthias. Have been busy with other work recently.
> I'll restart the discussion and update the KIP accordingly.
>
> Lei
>
> On Tue, Nov 6, 2018 at 3:11 PM Matthias J. Sax 
> wrote:
>
> > Any update on this KIP?
> >
> > On 9/20/18 3:30 PM, Matthias J. Sax wrote:
> > > Thanks for following up. Very nice examples!
> > >
> > > I think, that the window definition for Flink is semantically
> > > questionable. If there is only a single record, why is the window
> > > defined as [ts, ts+gap]? To me, this definition is not sound and seems
> > > to be arbitrary. To define the windows as [ts-gap,ts+gap] as you
> mention
> > > would be semantically more useful -- still, I think that defining the
> > > window as [ts,ts] as we do currently in Kafka Streams is semantically
> > > the best.
> > >
> > > I have the impression, that Flink only defines them differently,
> because
> > > it solves the issues in the implementation. (Ie, an implementation
> > > details leaks into the semantics, what is usually not desired.)
> > >
> > > However, I believe that we could change the implementation accordingly.
> > > We could store the windowed keys, as [ts-gap,ts+gap] (or [ts,ts+gap])
> in
> > > RocksDB, but at API level we return [ts,ts]. This way, we can still
> find
> > > all windows we need and provide the same deterministic behavior and
> keep
> > > the current window boundaries on the semantic level (there is no need
> to
> > > store the window start and/or end time). With this technique, we can
> > > also implement dynamic session gaps. I think, we would need to store
> the
> > > used "gap" for each window, too. But again, this would be an
> > > implementation detail.
> > >
> > > Let's see what others think.
> > >
> > > One tricky question we would need to address is, how we can be backward
> > > compatible. I am currently working on KIP-258 that should help to
> > > address this backward compatibility issue though.
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 9/19/18 5:17 PM, Lei Chen wrote:
> > >> Thanks Matthias. That makes sense.
> > >>
> > >> You're right that symmetric merge is necessary to ensure consistency.
> On
> > >> the other hand, I kinda feel it defeats the purpose of dynamic gap,
> > which
> > >> is to update the gap from old value to new value. The symmetric merge
> > >> always honor the larger gap in both direction, rather than honor the
> gap
> > >> carried by record with larger timestamp. I wasn't able to find any
> > semantic
> > >> definitions w.r.t this particular aspect online, but spent some time
> > >> looking into other streaming engines like Apache Flink.
> > >>
> > >> Apache Flink defines the window differently, that uses (start time,
> > start
> > >> time + gap).
> > >>
> > >> so our previous example (10, 10), (19,5),(15,3) in Flink's case will
> be:
> > >> [10,20]
> > >> [19,24] => merged to [10,24]
> > >> [15,18] => merged to [10,24]
> > >>
> > >> while example (15,3)(19,5)(10,10) will be
> > >> [15,18]
> > >> [19,24] => no merge
> > >> [10,20] => merged to [10,24]
> > >>
> > >> however, since it only records gap in future direction, not past, a
> late
> > >> record might not trigger any merge where in symmetric merge it would.
> > >> (7,2),(10, 10), (19,5),(15,3)
> > >> [7,9]
> > >> [10,20]
> > >> [19,24] => merged to [10,24]
> > >> [15,18] => merged to [10,24]
> > >> so at the end
> > >> two windows [7,9][10,24] are there.
> > >>
> > >> As you can see, in Flink, the gap semantic is more toward to the way
> > that,
> > >> a gap carried by one record only affects how this record merges with
> > future
> > >> records. e.g. a later event (T2, G2) will only be merged with (T1, G1)
> > is
> > >> T2 is less than T1+G1, but not when T1 is less than T2 - G2. Let's
> call
> > >> this "forward-merge" way of handling this. I just went thought some
> > source
> > >> code and if my understanding is incorrect about Flink's
> implementation,
> > >> please correct me.
> > >>
> > >> On the other hand, if we want to do symmetric merge in Kafka Streams,
> we
> > >> can change the window definition to [start time - gap, start time +
> > gap].
> > >> This way the example (7,2),(10, 10), (19,5),(15,3) will be
> > >> [5,9]
> > >> [0,20] => merged to [0,20]
> > >> [14,24] => merged to [0,24]
> > >> [12,18] => merged to [0,24]
> > >>
> > >>  (19,5),(15,3)(7,2),(10, 10) will generate same result
> > >> [14,24]
> > >> [12,18] => merged to [12,24]
> > >> [5,9] => no merge
> > >> [0,20] => merged to [0,24]
> > >>
> > >> Note that symmetric-merge would require us to change the way how Kafka
> > >> Steams fetch windows now, instead of fetching range from timestamp-gap
> > to
> > >> timestamp+gap, we will need to fetch all windows that are not expired
> > yet.
> > >> On the othe

[jira] [Created] (KAFKA-7809) Getting uncaught exception in kafka-producer-network-thread | producer-7686: java.lang.OutOfMemoryError: Java heap space

2019-01-09 Thread Suman Kalyan Ghosh (JIRA)
Suman Kalyan Ghosh created KAFKA-7809:
-

 Summary: Getting uncaught exception in 
kafka-producer-network-thread | producer-7686: java.lang.OutOfMemoryError: Java 
heap space
 Key: KAFKA-7809
 URL: https://issues.apache.org/jira/browse/KAFKA-7809
 Project: Kafka
  Issue Type: Bug
  Components: log
 Environment: Performance
Reporter: Suman Kalyan Ghosh


Getting uncaught exception in kafka-producer-network-thread | producer-7686: 
java.lang.OutOfMemoryError: Java heap space 

Env: Performance environment

Set up : 3 nodes single cluster 

Logs : 


Jan 09, 2019 5:09:09 AM org.apache.kafka.common.utils.KafkaThread$1 
uncaughtException
SEVERE: Uncaught exception in kafka-producer-network-thread | producer-7686:
java.lang.OutOfMemoryError: Java heap space
 at java.lang.AbstractStringBuilder.(AbstractStringBuilder.java:68)
 at java.lang.StringBuilder.(StringBuilder.java:89)
 at java.net.InetAddress.toString(InetAddress.java:725)
 at 
org.apache.kafka.common.network.KafkaChannel.socketDescription(KafkaChannel.java:117)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:305)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
 at java.lang.Thread.run(Thread.java:748)

  
  (AbstractStringBuilder.java:68)
 at java.lang.StringBuilder.(StringBuilder.java:89)
 at java.net.InetAddress.toString(InetAddress.java:725)
 at 
org.apache.kafka.common.network.KafkaChannel.socketDescription(KafkaChannel.java:117)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:305)
 Truncated. see log file for complete stacktrace
>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSSION] KIP-412: Extend Admin API to support dynamic application log levels

2019-01-09 Thread Stanislav Kozlovski
Sorry about cutting the last message short. I was meaning to say that in
the future we would be able to introduce finer-grained logging
configuration (e.g enable debug logs for operations pertaining to this
topic) and that would be easier to do if we are to know what the target
resource of an IncrementalAlterConfig request is.

Separating the resource types also allows us to not return a huge
DescribeConfigs response on the BROKER resource type - the logging
configurations can be quite verbose.

I hope that answers your question

Best,
Stanislav

On Wed, Jan 9, 2019 at 3:26 PM Stanislav Kozlovski 
wrote:

> Hey Ryanne, thanks for taking a look at the KIP!
>
> I think that it is useful to specify the distinction between a standard
> Kafka config and the log level configs. The log level can be looked at as a
> separate resource as it does not change the behavior of the Kafka broker in
> any way.
> In terms of practical benefits, separating the two eases this KIP's
> implementation and user's implementation of AlterConfigPolicy (e.g deny all
> requests that try to alter log level) significantly. We would also be able
> to introduce a
>
> On Wed, Jan 9, 2019 at 1:48 AM Ryanne Dolan  wrote:
>
>> > To differentiate between the normal Kafka config settings and the
>> application's log level settings, we will introduce a new resource type -
>> BROKER_LOGGERS
>>
>> Stanislav, can you explain why log level wouldn't be a "normal Kafka
>> config
>> setting"?
>>
>> Ryanne
>>
>> On Tue, Jan 8, 2019, 4:26 PM Stanislav Kozlovski > wrote:
>>
>> > Hey there everybody,
>> >
>> > I'd like to start a discussion about KIP-412. Please take a look at the
>> KIP
>> > if you can, I would appreciate any feedback :)
>> >
>> > KIP: KIP-412
>> > <
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-412%3A+Extend+Admin+API+to+support+dynamic+application+log+levels
>> > >
>> > JIRA: KAFKA-7800 
>> >
>> > --
>> > Best,
>> > Stanislav
>> >
>>
>
>
> --
> Best,
> Stanislav
>


-- 
Best,
Stanislav


[jira] [Resolved] (KAFKA-6431) Lock contention in Purgatory

2019-01-09 Thread Sriharsha Chintalapani (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani resolved KAFKA-6431.
---
Resolution: Fixed

> Lock contention in Purgatory
> 
>
> Key: KAFKA-6431
> URL: https://issues.apache.org/jira/browse/KAFKA-6431
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, purgatory
>Reporter: Ying Zheng
>Assignee: Ying Zheng
>Priority: Minor
> Fix For: 2.2.0
>
>
> Purgatory is the data structure in Kafka broker that manages delayed 
> operations. There is a ConcurrentHashMap (Kafka Pool) maps each operation key 
> to the operations (in a ConcurrentLinkedQueue) that are interested in the key.
> When an operation is done or expired, it's removed from the list 
> (ConcurrentLinkedQueue). When the list is empty, it's removed from the 
> ConcurrentHashMap. The 2nd operation has to be protected by a lock, to avoid 
> adding new operations into a list that is being removed. This is currently 
> done by a globally shared ReentrantReadWriteLock. All the read operations on 
> purgatory have to acquire the read permission of this lock. The list removing 
> operations needs the write permission of this lock.
> Our profiling result shows that Kafka broker is spending a nontrivial amount 
> of time on this read write lock.
> The problem is exacerbated when there are a large amount of short operations. 
> For example, when we are doing sync produce operations (acks=all), a 
> DelayedProduce operation is added and then removed for each message. If the 
> QPS of the topic is not high, it's very likely that, when the operation is 
> done and removed, the list of that key (topic partitions) also becomes empty, 
> and has to be removed when holding the write lock. This operation blocks all 
> the read / write operations on entire purgatory for awhile. As there are tens 
> of IO threads accessing purgatory concurrently, this shared lock can easily 
> become a bottleneck. 
> Actually, we only want to avoid concurrent read / write on the same key. The 
> operations on different keys do not conflict with each other.
> I suggest to shard purgatory into smaller partitions, and lock each 
> individual partition independently.
> Assuming there are 10 io threads actively accessing purgatory, sharding 
> purgatory into 512 partitions will make the probability for 2 or more threads 
> accessing the same partition at the same time to be about 2%. We can also use 
> ReentrantLock instead of ReentrantReadWriteLock. When the read operations are 
> not much more than write operations, ReentrantLock has lower overhead than 
> ReentrantReadWriteLock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #3300

2019-01-09 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: Add 2.1 version metadata upgrade (#6111)

--
[...truncated 2.25 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDi

Re: [VOTE] KIP-382 MirrorMaker 2.0

2019-01-09 Thread Ryanne Dolan
Thanks Jun.

> 103. My point was that the MirrorMakerConnector can die while the
Heartbeat connector is still alive. So, one can't solely rely on Heartbeat
for monitoring?

Each cluster will have a heartbeat topic produced by
MirrorHeartbeatConnector, which doesn't have an associated "source" other
than time. This topic gets picked up by downstream MirrorSourceConnectors
and replicated like A.heartbeat. So the heartbeat topic itself isn't
particular useful for monitoring, but the downstream A.heartbeat shows that
heartbeats are being replicated successfully from A -> B. If a
MirrorSourceConnector fails while replicating A -> B, you'd still see
heartbeats in cluster B, but not A.heartbeat.

105. You're correct, you don't need to know "B" in order to go from A's
topic1 to B's A.topic1, i.e. migrating downstream. But you need to know "B"
to go from A's B.topic1 to B's topic1. In the latter case, you are
consuming a remote topic to begin with, and then migrating to the source
cluster, i.e. migrating upstream. N.B. you strip the "B" prefix in this
case, rather than add the "A" prefix. And you can't just strip all
prefixes, because you could be migrating from e.g. A's C.topic1 to B's
C.topic1, i.e. migrating "laterally", if you will.

I suppose we could break this out into multiple methods (upstream,
downstream, lateral etc), but I think that would add a lot more complexity
and confusion to the API. By providing both A and B, the single method can
always figure out what to do.

107. done

Thanks,
Ryanne




On Wed, Jan 9, 2019 at 6:11 PM Jun Rao  wrote:

> Hi, Ryanne,
>
> 103. My point was that the MirrorMakerConnector can die while the
> Heartbeat connector is still alive. So, one can't solely rely on Heartbeat
> for monitoring?
>
> 105. Hmm, maybe I don't understand how this is done. Let's say we replica
> topic1 from cluster A to cluster B. My understanding is that to translate
> the offset from A to B for a consumer group, we read A.checkpoint file in
> cluster B to get the timestamp of the last checkpointed offset, call
> consumer.offsetsForTimes() on A.topic1 in cluster B to translate the
> timestamp to a local offset, and return . Is
> that right? If so, in all steps, we don't need to know the
> targetClusterAlias B. We just need to know the connection string to
> cluster B, which targetConsumerConfig provides.
>
> 107. Thanks. Could you add that description to the KIP?
>
> Thanks,
>
> Jun
>
> On Mon, Jan 7, 2019 at 3:50 PM Ryanne Dolan  wrote:
>
>> Thanks Jun, I've updated the KIP as requested. Brief notes below:
>>
>> 100. added "...out-of-the-box (without custom handlers)..."
>>
>> 101. done. Good idea to include a MessageFormatter.
>>
>> 102. done.
>>
>> > 103. [...] why is Heartbeat a separate connector?
>>
>> Heartbeats themselves are replicated via MirrorSource/SinkConnector, so
>> if replication stops, you'll stop seeing heartbeats in downstream clusters.
>> I've updated the KIP to make this clearer and have added a bullet to
>> Rejected Alternatives.
>>
>> 104. added "heartbeat.retention.ms", "checkpoint.retention.ms", thanks.
>> The heartbeat topic doesn't need to be compacted.
>>
>> > 105. [...] I am not sure why targetClusterAlias is useful
>>
>> In order to map A's B.topic1 to B's topic1, we need to know B.
>>
>> > 106. [...] should the following properties be prefixed with "consumer."
>>
>> No, they are part of Connect's worker config.
>>
>> > 107. So, essentially it's running multiple logical connect clusters on
>> the same shared worker nodes?
>>
>> Correct. Rather than configure each Connector and Worker and Herder
>> individually, a single top-level configuration file is used. And instead of
>> running a bunch of separate worker processes on each node, a single process
>> runs multiple workers. This is implemented using a separate driver based on
>> ConnectDistributed, but which runs multiple DistributedHerders. Each
>> DistributedHerder uses a different Kafka cluster for coordination -- they
>> are completely separate apart from running in the same process.
>>
>> Thanks for helping improve the doc!
>> Ryanne
>>
>> On Fri, Jan 4, 2019 at 10:33 AM Jun Rao  wrote:
>>
>>> Hi, Ryanne,
>>>
>>> Thanks for KIP.  Still have a few more comments below.
>>>
>>> 100. "This is not possible with MirrorMaker today -- records would be
>>> replicated back and forth indefinitely, and the topics in either cluster
>>> would be merged inconsistently between clusters. " This is not 100% true
>>> since MM can do the topic renaming through MirrorMakerMessageHandler.
>>>
>>> 101. For both Heartbeat and checkpoint, could you define the full schema,
>>> including the field type? Also how are they serialized into the Kafka
>>> topic? Is it JSON or sth else? For convenience, it would be useful to
>>> provide a built-in MessageFormatter so that one can read each topic's
>>> data
>>> using tools like ConsoleConsumer.
>>>
>>> 102. For the public Heartbeat and Checkpoint class, could you list the
>>> public methods in e

Re: [VOTE] - KIP-213 (new vote) - Simplified and revised.

2019-01-09 Thread Guozhang Wang
Hello Adam,

I'm +1 on the current proposal, thanks!


Guozhang

On Mon, Jan 7, 2019 at 6:13 AM Adam Bellemare 
wrote:

> Hi All
>
> I would like to call a new vote on KIP-213. The design has changed
> substantially. Perhaps more importantly, the KIP and associated
> documentation has been greatly simplified. I know this KIP has been on the
> mailing list for a long time, but the help from John Roesler and Guozhang
> Wang have helped put it into a much better state. I would appreciate any
> feedback or votes.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable
>
>
>
> Thank you
>
> Adam Bellemare
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-09 Thread Guozhang Wang
Hello Florian,

Just checking if have read about my previous email and if you feel happy
about it. We have the 2.2 KIP freeze deadline at 24th this month, while the
PR itself is getting quite close. So it'll be great if we can get the
agreement on it and get it into 2.2.0 release.


Guozhang


On Mon, Dec 17, 2018 at 2:39 PM Guozhang Wang  wrote:

> Hi Florian / John,
>
> Just wanted to throw a couple minor thoughts on the current proposal:
>
> 1) Regarding the interface / function name, I'd propose we call the
> interface `NamedOperation` which would be implemented by Produced /
> Consumed / Printed / Joined / Grouped / Suppressed (note I intentionally
> exclude Materialized here since its semantics is quite), and have the
> default class that implements `NamedOperation` as `Named`, which would be
> used in our adding overload functions. The main reason is to have
> consistency in naming.
>
> 2) As a minor tweak, I think it's better to use Joined.name() in both its
> possibly generate repartition topic, as well as the map processor used for
> group-by (currently this name is only used for the repartition topic).
>
>
> Florian: if you think this proposal makes sense, please feel free to go
> ahead and update the PR; after we made a first pass on it and feels
> confident about it, we can go ahead with the VOTING process. About the
> implementation of 2) above, this may be out of your implementation scope,
> so feel free to leave it out side your PR while Bill who originally worked
> on the Grouped KIP can make a follow-up PR for it.
>
> Guozhang
>
> On Fri, Dec 14, 2018 at 9:43 PM Guozhang Wang  wrote:
>
>> Hello Florian,
>>
>> Really appreciate you for your patience.
>>
>> I know that we've discussed about the approach to adding overloaded
>> functions and rejected it early on. But looking deeper into the current PR
>> I realized that this approach has a danger of great API confusions to users
>> (I tried to explain my thoughts in the PR, but it was not very clear) ---
>> the basic idea is that, today we already have a few existing control
>> classes including Grouped, Joined, Suppressed that allow users to specify
>> serdes etc, while also a "name" which can then be used to define the
>> processor name / internal topic names in the topology (the static function
>> names are not consistent, which I think we should fix as well). And Named
>> interface, by extending the lambda function interfaces like ValueJoiner /
>> Predicate etc opens the door for another way to specify the names again.
>>
>> So in order to achieve consistency, we are left with generally two
>> options:
>>
>> 1) only allow users to specify names via the lambda interfaces that
>> extends Named interface. This means we'd better remove the naming mechanism
>> from the existing control objects to keep consistency.
>>
>> 2) only allow users to specify names via control classes, and we
>> introduce a new class (Named) for those which do not have one yet --- this
>> leads to the overloaded functions.
>>
>> I did a quick count on the num.of overloaded functions, and summing from
>> KTable (8) / KStream (15) / KGroupedStream (6) / KGroupedTable (6) /
>> TimeWindowedKStream (6) / SessionWindowedKStream (6) we got about 47
>> overloaded functions (our guess was pretty close!) -- note this is based on
>> John's proposal that we can let existing Grouped / Joined to extend Named
>> and hence we only need overloaded functions with a default NamedOperation
>> for those operators that do not have a control classes already.
>>
>> Thinking about this approach I feel it is not too bad compared with
>> either 1) above, which would require us to deprecate lot of public
>> functions around name(), or having a mixed mechanism for naming, which
>> could lead to very confusing behavior to users. Additionally, for most
>> users who would only want to specify the names for those stateful
>> operations which have internal topics / state stores and hence are more
>> keen to upgrade compatibility, those added overloads would be not-often
>> used functions for them anyways. And by letting existing control classes to
>> extend Named, we can have a unified method name for static constructor as
>> well.
>>
>>
>>
>> Guozhang
>>
>>
>> On Fri, Dec 14, 2018 at 10:24 AM John Roesler  wrote:
>>
>>> Hi Florian,
>>>
>>> Sorry about the run-around of rejecting the original proposal,
>>> only to return to it later on. Hopefully, it's more encouraging
>>> than frustrating that we're coming around to your initial way of
>>> thinking.
>>>
>>> Thanks!
>>> -John
>>>
>>> On Thu, Dec 13, 2018 at 4:28 PM Florian Hussonnois <
>>> fhussonn...@gmail.com>
>>> wrote:
>>>
>>> > Hi all,
>>> >
>>> > Thanks again. I agree with your propositions.
>>> > Also IMHO, overloading all methods (filter, map) to accept a new
>>> control
>>> > object seems to provide a more natural development experience for
>>> users.
>>> >
>>> > Actually, this was the first proposition for this KIP, but we have
>>> rejected

Re: [VOTE] KIP-382 MirrorMaker 2.0

2019-01-09 Thread Jun Rao
Hi, Ryanne,

103. My point was that the MirrorMakerConnector can die while the Heartbeat
connector is still alive. So, one can't solely rely on Heartbeat for
monitoring?

105. Hmm, maybe I don't understand how this is done. Let's say we replica
topic1 from cluster A to cluster B. My understanding is that to translate
the offset from A to B for a consumer group, we read A.checkpoint file in
cluster B to get the timestamp of the last checkpointed offset, call
consumer.offsetsForTimes() on A.topic1 in cluster B to translate the
timestamp to a local offset, and return . Is
that right? If so, in all steps, we don't need to know the
targetClusterAlias B. We just need to know the connection string to cluster
B, which targetConsumerConfig provides.

107. Thanks. Could you add that description to the KIP?

Thanks,

Jun

On Mon, Jan 7, 2019 at 3:50 PM Ryanne Dolan  wrote:

> Thanks Jun, I've updated the KIP as requested. Brief notes below:
>
> 100. added "...out-of-the-box (without custom handlers)..."
>
> 101. done. Good idea to include a MessageFormatter.
>
> 102. done.
>
> > 103. [...] why is Heartbeat a separate connector?
>
> Heartbeats themselves are replicated via MirrorSource/SinkConnector, so if
> replication stops, you'll stop seeing heartbeats in downstream clusters.
> I've updated the KIP to make this clearer and have added a bullet to
> Rejected Alternatives.
>
> 104. added "heartbeat.retention.ms", "checkpoint.retention.ms", thanks.
> The heartbeat topic doesn't need to be compacted.
>
> > 105. [...] I am not sure why targetClusterAlias is useful
>
> In order to map A's B.topic1 to B's topic1, we need to know B.
>
> > 106. [...] should the following properties be prefixed with "consumer."
>
> No, they are part of Connect's worker config.
>
> > 107. So, essentially it's running multiple logical connect clusters on
> the same shared worker nodes?
>
> Correct. Rather than configure each Connector and Worker and Herder
> individually, a single top-level configuration file is used. And instead of
> running a bunch of separate worker processes on each node, a single process
> runs multiple workers. This is implemented using a separate driver based on
> ConnectDistributed, but which runs multiple DistributedHerders. Each
> DistributedHerder uses a different Kafka cluster for coordination -- they
> are completely separate apart from running in the same process.
>
> Thanks for helping improve the doc!
> Ryanne
>
> On Fri, Jan 4, 2019 at 10:33 AM Jun Rao  wrote:
>
>> Hi, Ryanne,
>>
>> Thanks for KIP.  Still have a few more comments below.
>>
>> 100. "This is not possible with MirrorMaker today -- records would be
>> replicated back and forth indefinitely, and the topics in either cluster
>> would be merged inconsistently between clusters. " This is not 100% true
>> since MM can do the topic renaming through MirrorMakerMessageHandler.
>>
>> 101. For both Heartbeat and checkpoint, could you define the full schema,
>> including the field type? Also how are they serialized into the Kafka
>> topic? Is it JSON or sth else? For convenience, it would be useful to
>> provide a built-in MessageFormatter so that one can read each topic's data
>> using tools like ConsoleConsumer.
>>
>> 102. For the public Heartbeat and Checkpoint class, could you list the
>> public methods in each class?
>>
>> 103. I am wondering why is Heartbeat a separate connector? A MirrorMaker
>> connector can die independent of the Heartbeat connector, which seems to
>> defeat the purpose of heartbeat.
>>
>> 104. Is the Heartbeat topic also a compacted topic? If not, how long is it
>> retained for?
>>
>> 105. For the following, I am not sure why targetClusterAlias is useful?
>> The
>> checkpoint file seems to only include sourceClusterAlias.
>>
>> Map translateOffsets(Map targetConsumerConfig,
>> String sourceClusterAlias, String targetClusterAlias, String
>> remoteGroupId)
>>
>> 106. In the configuration example, should the following properties be
>> prefixed with "consumer."?
>> key.converter
>>  =
>> org.apache.kafka.connect.converters.ByteArrayConverter
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/org.apache.kafka.connect.converters.ByteArrayConverter
>> >
>> value.converter
>>  =
>> org.apache.kafka.connect.converters.ByteArrayConverter
>> <
>> https://cwiki.apache.org/confluence/display/KAFKA/org.apache.kafka.connect.converters.ByteArrayConverter
>> >
>>
>> 107. Could you add a bit more description on how connect-mirror-maker.sh
>> is
>> implemented? My understanding is that it will start as many as
>> separate DistributedHerder as the Kafka clusters? So, essentially it's
>> running multiple logical connect clusters on the same shared worker nodes?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Thu, Dec 20, 2018 at 5:23 PM Srinivas Reddy <
>> srinivas96all...@gmail.com>
>> wrote:
>>
>> > +1 (non binding)
>> >
>> >

Re: [DISCUSS] KIP-378: Enable Dependency Injection for Kafka Streams handlers

2019-01-09 Thread Guozhang Wang
Hello Wladimir,

Just checking if you are still working on this KIP. We have the 2.2 KIP
freeze deadline by 24th this month, and it'll be great to complete this KIP
by then so 2.2.0 release could have this feature.


Guozhang

On Mon, Dec 3, 2018 at 11:26 PM Guozhang Wang  wrote:

> Hello Wladimir,
>
> I've thought about the two options and I think I'm sold on the second
> option and actually I think it is better generalize it to be potentially
> used for other clients (producer, consumer) as while since they also have
> similar dependency injection requests for metrics reporter, partitioner,
> partition assignor etc.
>
> So I'd suggest we add the following to AbstractConfig directly (note I
> intentionally renamed the class to ConfiguredInstanceFactory to be used for
> other clients as well):
>
> ```
> AbstractConfig(ConfigDef definition, Map originals,
> ConfiguredInstanceFactory, boolean doLog)
> ```
>
> And then in StreamsConfig add:
>
> ```
> StreamsConfig(Map props, ConfiguredInstanceFactory)
> ```
>
> which would call the above AbstractConfig constructor (we can leave to
> core team to decide when they want to add for producer and consumer);
>
> And in KafkaStreams / TopologyTestDriver we can add one overloaded
> constructor each that includes all the parameters including the
> ConfiguredInstanceFactory --- for those who only want `factory` but not
> `client-suppliers` for example, they can set it to `null` and the streams
> library will just use the default one.
>
>
> Guozhang
>
>
> On Sun, Dec 2, 2018 at 12:13 PM Wladimir Schmidt 
> wrote:
>
>> Hello Guozhang,
>> sure, the first approach is very straight-forward and allows minimal
>> changes to the Kafka Streams API.
>> On the other hand, second approach with the interface implementation
>> looks more cleaner to me.
>> I totally agree that this should be first discussed before will be
>> implemented.
>>
>> Thanks, Wladimir
>>
>>
>> On 17-Nov-18 23:37, Guozhang Wang wrote:
>>
>> Hello folks,
>>
>> I'd like to revive this thread for discussion. After reading the previous
>> emails I think I'm still a bit leaning towards re-enabling to pass in
>> StreamsConfig to Kafka Streams constructors compared with a
>> ConfiguredStreamsFactory as additional parameters to overloaded
>> KafkaStreams constructors: although the former seems less cleaner as it
>> requires users to read through the usage of AbstractConfig to know how to
>> use it in their frameworks, this to me is a solvable problem through
>> documentations, plus AbstractConfig is a public interface already and hence
>> the additional ConfiguredStreamsFactory to me is really a bit overlapping
>> in functionality.
>>
>>
>> Guozhang
>>
>>
>>
>> On Sun, Oct 21, 2018 at 1:41 PM Wladimir Schmidt  
>>  wrote:
>>
>>
>> Hi Damian,
>>
>> The first approach was added only because it had been initially proposed
>> in my pull request,
>> which started a discussion and thus, the KIP-378 was born.
>>
>> Yes, I would like to have something "injectable". In this regard, a
>> `ConfiguredStreamsFactory` (name is a subject to discussion)
>> is a good option to be introduced into `KafkaStreams` constructor.
>>
>> Even though, I consider the second approach to be cleaner, it involves a
>> certain amount of refactoring of the streams library.
>> The first approach, on the contrary, adds (or removes deprecated
>> annotation, if the method has not been removed yet) only additional
>> constructors with
>> considerably less intervention into a streams library (no changes, which
>> would break an API. Please see a pull 
>> request:https://github.com/apache/kafka/pull/5344).
>>
>> Thanks
>> Wladimir
>>
>> On 10-Oct-18 15:51, Damian Guy wrote:
>>
>> Hi Wladimir,
>>
>> Of the two approaches in the KIP - i feel the second approach is cleaner.
>> However, am i correct in assuming that you want to have the
>> `ConfiguredStreamsFactory` as a ctor arg in `StreamsConfig` so that
>>
>> Spring
>>
>> can inject this for you?
>>
>> Otherwise you could just put the ApplicationContext as a property in the
>> config and then use that via the configure method of the appropriate
>> handler to get your actual handler.
>>
>> Thanks,
>> Damian
>>
>> On Tue, 9 Oct 2018 at 01:55, Guozhang Wang  
>>  wrote:
>>
>>
>> John, thanks for the explanation, now it makes much more sense to me.
>>
>> As for the concrete approach, to me it seems the first option requires
>>
>> less
>>
>> changes than the second (ConfiguredStreamsFactory based) approach,
>>
>> whereas
>>
>> the second one requires an additional interface that is overlapping with
>> the AbstractConfig.
>>
>> I'm aware that in KafkaProducer / KafkaConsumer we do not have public
>> constructors for taking a ProducerConfig or ConsumerConfig directly, and
>> anyone using Spring can share how you've worked around it by far? If it
>>
>> is
>>
>> very awkward I'm not against just adding the XXXConfigs to the
>>
>> constructors
>>
>> directly.
>>
>> Guozhang
>>
>> On Fri, Oct 5, 2018 at 1:48 PM, John Ro

Build failed in Jenkins: kafka-2.1-jdk8 #101

2019-01-09 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-7786; Ignore OffsetsForLeaderEpoch response if epoch changed while

--
[...truncated 917.14 KB...]

kafka.zk.ReassignPartitionsZNodeTest > testDecodeValidJson STARTED

kafka.zk.ReassignPartitionsZNodeTest > testDecodeValidJson PASSED

kafka.zk.KafkaZkClientTest > testZNodeChangeHandlerForDataChange STARTED

kafka.zk.KafkaZkClientTest > testZNodeChangeHandlerForDataChange PASSED

kafka.zk.KafkaZkClientTest > testCreateAndGetTopicPartitionStatesRaw STARTED

kafka.zk.KafkaZkClientTest > testCreateAndGetTopicPartitionStatesRaw PASSED

kafka.zk.KafkaZkClientTest > testLogDirGetters STARTED

kafka.zk.KafkaZkClientTest > testLogDirGetters PASSED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment STARTED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment PASSED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationsDeletion STARTED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationsDeletion PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion PASSED

kafka.zk.KafkaZkClientTest > testGetChildren STARTED

kafka.zk.KafkaZkClientTest > testGetChildren PASSED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset STARTED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset PASSED

kafka.zk.KafkaZkClientTest > testClusterIdMethods STARTED

kafka.zk.KafkaZkClientTest > testClusterIdMethods PASSED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testUpdateLeaderAndIsr STARTED

kafka.zk.KafkaZkClientTest > testUpdateLeaderAndIsr PASSED

kafka.zk.KafkaZkClientTest > testUpdateBrokerInfo STARTED

kafka.zk.KafkaZkClientTest > testUpdateBrokerInfo PASSED

kafka.zk.KafkaZkClientTest > testCreateRecursive STARTED

kafka.zk.KafkaZkClientTest > testCreateRecursive PASSED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData STARTED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods PASSED

kafka.zk.KafkaZkClientTest > testSetTopicPartitionStatesRaw STARTED

kafka.zk.KafkaZkClientTest > testSetTopicPartitionStatesRaw PASSED

kafka.zk.KafkaZkClientTest > testAclManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testAclManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods STARTED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods PASSED

kafka.zk.KafkaZkClientTest > testPropagateLogDir STARTED

kafka.zk.KafkaZkClientTest > testPropagateLogDir PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndStat STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndStat PASSED

kafka.zk.KafkaZkClientTest > testReassignPartitionsInProgress STARTED

kafka.zk.KafkaZkClientTest > testReassignPartitionsInProgress PASSED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths STARTED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths PASSED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationGetters STARTED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationGetters PASSED

kafka.zk.KafkaZkClientTest > testLogDirEventNotificationsDeletion STARTED

kafka.zk.KafkaZkClientTest > testLogDirEventNotificationsDeletion PASSED

kafka.zk.KafkaZkClientTest > testGetLogConfigs STARTED

kafka.zk.KafkaZkClientTest > testGetLogConfigs PASSED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods STARTED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath STARTED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath PASSED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath STARTED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicZNode STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicZNode PASSED

kafka.zk.KafkaZkClientTest > testDeletePath STARTED

kafka.zk.KafkaZkClientTest > testDeletePath PASSED

kafka.zk.KafkaZkClientTest > testGetBrokerMethods STARTED

kafka.zk.KafkaZkClientTest > testGetBrokerMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateTokenChangeNotification STARTED

kafka.zk.KafkaZkClientTest > testCreateTokenChangeNotification PASSED

kafka.zk.KafkaZkClientTest > testGetTopicsAndPartitions STARTED

kafka.zk.KafkaZkClientTest > testGetTopicsAndPartitions PASSED

kafka.zk.KafkaZkClientTest > testRegisterBrokerInfo STARTED

kafka.zk.KafkaZkClientTest > testRegisterBrokerInfo PASSED

kafka.zk.KafkaZkClientTest > testConsumerOffsetPath STARTED

kafka.zk.KafkaZkClientTest > testConsumerOffsetPath PASSED

kafka.zk.KafkaZkClientTest > testDeleteRecursiveWithControllerEpochVersionCheck 
STARTED

kafka.zk.KafkaZkClie

[jira] [Created] (KAFKA-7808) AdminClient#describeTopic should not throw InvalidTopic if topic name is not found

2019-01-09 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7808:


 Summary: AdminClient#describeTopic should not throw InvalidTopic 
if topic name is not found
 Key: KAFKA-7808
 URL: https://issues.apache.org/jira/browse/KAFKA-7808
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Reporter: Guozhang Wang


In AdminClient#describeTopic, we have the following logic:

{code}
if (!cluster.topics().contains(topicName)) {
future.completeExceptionally(new 
InvalidTopicException("Topic " + topicName + " not found."));
continue;
}
{code}

However, {{InvalidTopicException}} is a non-retriable exception and is used to 
indicate that topic contains invalid chars or topic name is too long etc, and 
hence not correct to use. We should, instead, throw the retriable 
{{UnknownTopicOrPartitionException}} instead.

We should make sure any callers on this logic should be cleaned up when fixing 
it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7786) Fast update of leader epoch may stall partition fetching due to FENCED_LEADER_EPOCH

2019-01-09 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7786.

   Resolution: Fixed
Fix Version/s: 2.1.1
   2.2.0

> Fast update of leader epoch may stall partition fetching due to 
> FENCED_LEADER_EPOCH
> ---
>
> Key: KAFKA-7786
> URL: https://issues.apache.org/jira/browse/KAFKA-7786
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Critical
> Fix For: 2.2.0, 2.1.1
>
>
> KIP-320/KAFKA-7395 Added FENCED_LEADER_EPOCH error response to a 
> OffsetsForLeaderEpoch request if the epoch in the request is lower than the 
> broker's leader epoch. ReplicaFetcherThread builds a OffsetsForLeaderEpoch 
> request under _partitionMapLock_, sends the request outside the lock, and 
> then processes the response under _partitionMapLock_. The broker may receive 
> LeaderAndIsr with the same leader but with the next leader epoch, remove and 
> add partition to the fetcher thread (with partition state reflecting the 
> updated leader epoch) – all while the OffsetsForLeaderEpoch request (with the 
> old leader epoch) is still outstanding/ waiting for the lock to process the 
> OffsetsForLeaderEpoch response. As a result, partition gets removed from 
> partitionStates and this broker will not fetch for this partition until the 
> next LeaderAndIsr which may take a while. We will see log message like this:
> [2018-12-23 07:23:04,802] INFO [ReplicaFetcher replicaId=3, leaderId=2, 
> fetcherId=0] Partition test_topic-17 has an older epoch (7) than the current 
> leader. Will await the new LeaderAndIsr state before resuming fetching. 
> (kafka.server.ReplicaFetcherThread)
> We saw this happen with 
> kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=True.reassign_from_offset_zero=True.
>  This test does partition re-assignment while bouncing 2 out of 4 total 
> brokers. When the failure happen, each bounced broker was also a controller. 
> Because of re-assignment, the controller updates leader epoch without 
> updating the leader on controller change or on broker startup, so we see 
> several leader epoch changes without the leader change, which increases the 
> likelihood of the race condition described above.
> Here is exact events that happen in this test (around the failure):
> We have 4 brokers Brokers 1, 2, 3, 4. Partition re-assignment is started for 
> test_topic-17 [2, 4, 1]  —> [3, 1, 2]. At time t0, leader of test_topic-17 is 
> broker 2.
>  # clean shutdown of broker 3, which is also a controller
>  # broker 4 becomes controller, continues re-assignment and updates leader 
> epoch for test_topic-17 to 6 (with same leader)
>  # broker 2 (leader of test_topic-17) receives new leader epoch: 
> “test_topic-17 starts at Leader Epoch 6 from offset 1388. Previous Leader 
> Epoch was: 5”
>  # broker 3 is started again after clean shutdown
>  # controller sees broker 3 startup, and sends LeaderAndIsr(leader epoch 6) 
> to broker 3
>  # controller updates leader epoch to 7
>  # broker 2 (leader of test_topic-17) receives LeaderAndIsr for leader epoch 
> 7: “test_topic-17 starts at Leader Epoch 7 from offset 1974. Previous Leader 
> Epoch was: 6”
>  # broker 3 receives LeaderAndIsr for test_topic-17 and leader epoch 6 from 
> controller: “Added fetcher to broker BrokerEndPoint(id=2) for leader epoch 6” 
> and sends OffsetsForLeaderEpoch request to broker 2
>  # broker 3 receives LeaderAndIsr for test_topic-17 and leader epoch 7 from 
> controller; removes fetcher thread and adds fetcher thread + executes 
> AbstractFetcherThread.addPartitions() which updates partition state with 
> leader epoch 7
>  # broker 3 receives FENCED_LEADER_EPOCH in response to 
> OffsetsForLeaderEpoch(leader epoch 6), because the leader received 
> LeaderAndIsr for leader epoch 7 before it got OffsetsForLeaderEpoch(leader 
> epoch 6) from broker 3. As a result, it removes partition from 
> partitionStates and it does not fetch until controller updates leader epoch 
> and sends LeaderAndIsr for this partition to broker 3. The test fails, 
> because re-assignment does not finish on time (due to broker 3 not fetching).
>  
> One way to address this is possibly add more state to PartitionFetchState. 
> However, we may introduce other race condition. A cleaner way, I think, is to 
> return leader epoch in the OffsetsForLeaderEpoch response with 
> FENCED_LEADER_EPOCH error, and then ignore the error if partition state 
> contains a higher leader epoch. The advantage is less state maintenance, but 
> disadvantage is it requires bumping inter-broker protocol.
> h1.  



--
This message was se

Jenkins build is back to normal : kafka-trunk-jdk8 #3297

2019-01-09 Thread Apache Jenkins Server
See 




Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata growth

2019-01-09 Thread Gwen Shapira
Thanks for the data driven approach, Stanislav. I love it :)
And thank you for sharing your formula, Boyang. I totally agree that
rebalance latency will not grow linearly with the consumer group size.

My recommendation, considering what we know today:
1. Add the limit config, and set it to MAX_INT by default (effectively
unlimited, without a magic number like -1)
2. Document our thoughts - the concern about runaway groups,
Pinterest's 500 limit, Confluent's experience with few thousand
consumers in a group, the conclusions from Stanislav's memory research
(Personally, I wouldn't want what is essentially a linked list that we
iterate to grow beyond 1M).

Mostly likely, 99% of the users won't need it and those who do will
have the right information to figure things out (or at least, they'll
know everything that we know).

WDYT?

On Wed, Jan 9, 2019 at 4:25 AM Stanislav Kozlovski
 wrote:
>
> Hey everybody,
>
> I ran a quick benchmark and took some heap dumps to gauge how much memory
> each consumer in a group is using, all done locally.
> The setup was the following: 10 topics with 10 partitions each (100
> partitions total) and one consumer group with 10 members, then expanded to
> 20 members.
> Here are some notes of my findings in a public Google doc:
> https://docs.google.com/document/d/1Z4aY5qg8lU2uNXzdgp_30_oJ9_I9xNelPko6GIQYXYk/edit?usp=sharing
>
>
> On Mon, Jan 7, 2019 at 10:51 PM Boyang Chen  wrote:
>
> > Hey Stanislav,
> >
> > I think the time taken to rebalance is not linearly correlated with number
> > of consumers with our application. As for our current and future use cases,
> > the main concern for Pinterest is still on the broker memory not CPU,
> > because crashing server by one application could have cascading effect on
> > all jobs.
> > Do you want to drive a more detailed formula on how to compute the memory
> > consumption against number of consumers within the group?
> >
> > In the meantime, I'm pretty buying in the motivation of this KIP, so I
> > think the follow-up work is just refinement to make the new config easy to
> > use. We should be good
> > to vote IMO.
> >
> > Best,
> > Boyang
> > 
> > From: Stanislav Kozlovski 
> > Sent: Monday, January 7, 2019 4:21 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> > metadata growth
> >
> > Hey there,
> >
> > Per Gwen's comments, I slightly reworked the motivation section. Let me
> > know if it's any better now
> >
> > I completely agree that it would be best if we were to add a recommended
> > number to a typical consumer group size. There is a problem that timing the
> > CPU usage and rebalance times of consumer groups is tricky. We can update
> > the KIP with memory guidelines (e.g 1 consumer in a group uses X memory,
> > therefore 100 use Y).
> > I fear that the most useful recommendations though would be knowing the CPU
> > impact of large consumer groups and the rebalance times. That is,
> > unfortunately, tricky to test and measure.
> >
> > @Boyang, you had mentioned some numbers used in Pinterest. If available to
> > you, would you be comfortable sharing the number of consumers you are using
> > in a group and maybe the potential time it takes to rebalance it?
> >
> > I'd appreciate any anecdotes regarding consumer group sizes from the
> > community
> >
> > Best,
> > Stanislav
> >
> > On Thu, Jan 3, 2019 at 1:59 AM Boyang Chen  wrote:
> >
> > > Thanks Gwen for the suggestion! +1 on the guidance of defining
> > > group.max.size. I guess a sample formula would be:
> > > 2 * (# of brokers * average metadata cache size * 80%) / (# of consumer
> > > groups * size of a single member metadata)
> > >
> > > if we assumed non-skewed partition assignment and pretty fair consumer
> > > group consumption. The "2" is the 95 percentile of normal distribution
> > and
> > > 80% is just to buffer some memory capacity which are both open to
> > > discussion. This config should be useful for Kafka platform team to make
> > > sure one extreme large consumer group won't bring down the whole cluster.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Boyang
> > >
> > > 
> > > From: Gwen Shapira 
> > > Sent: Thursday, January 3, 2019 2:59 AM
> > > To: dev
> > > Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> > > metadata growth
> > >
> > > Sorry for joining the fun late, but I think the problem we are solving
> > > evolved a bit in the thread, and I'd like to have better understanding
> > > of the problem before voting :)
> > >
> > > Both KIP and discussion assert that large groups are a problem, but
> > > they are kinda inconsistent regarding why they are a problem and whose
> > > problem they are...
> > > 1. The KIP itself states that the main issue with large groups are
> > > long rebalance times. Per my understanding, this is mostly a problem
> > > for the application that consumes data, but not really a p

Build failed in Jenkins: kafka-2.1-jdk8 #100

2019-01-09 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-7799; Fix flaky test RestServerTest.testCORSEnabled (#6106)

--
[...truncated 427.31 KB...]
kafka.server.IsrExpirationTest > testIsrExpirationIfNoFetchRequestMade STARTED

kafka.server.IsrExpirationTest > testIsrExpirationIfNoFetchRequestMade PASSED

kafka.server.DelayedOperationTest > testRequestPurge STARTED

kafka.server.DelayedOperationTest > testRequestPurge PASSED

kafka.server.DelayedOperationTest > testRequestExpiry STARTED

kafka.server.DelayedOperationTest > testRequestExpiry PASSED

kafka.server.DelayedOperationTest > 
shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist STARTED

kafka.server.DelayedOperationTest > 
shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist PASSED

kafka.server.DelayedOperationTest > testDelayedOperationLockOverride STARTED

kafka.server.DelayedOperationTest > testDelayedOperationLockOverride PASSED

kafka.server.DelayedOperationTest > testTryCompleteLockContention STARTED

kafka.server.DelayedOperationTest > testTryCompleteLockContention PASSED

kafka.server.DelayedOperationTest > testTryCompleteWithMultipleThreads STARTED

kafka.server.DelayedOperationTest > testTryCompleteWithMultipleThreads PASSED

kafka.server.DelayedOperationTest > 
shouldCancelForKeyReturningCancelledOperations STARTED

kafka.server.DelayedOperationTest > 
shouldCancelForKeyReturningCancelledOperations PASSED

kafka.server.DelayedOperationTest > testRequestSatisfaction STARTED

kafka.server.DelayedOperationTest > testRequestSatisfaction PASSED

kafka.server.DelayedOperationTest > testDelayedOperationLock STARTED

kafka.server.DelayedOperationTest > testDelayedOperationLock PASSED

kafka.server.ReplicaAlterLogDirsThreadTest > 
shouldFetchLeaderEpochOnFirstFetchOnly STARTED

kafka.server.ReplicaAlterLogDirsThreadTest > 
shouldFetchLeaderEpochOnFirstFetchOnly PASSED

kafka.server.ReplicaAlterLogDirsThreadTest > issuesEpochRequestFromLocalReplica 
STARTED

kafka.server.ReplicaAlterLogDirsThreadTest > issuesEpochRequestFromLocalReplica 
PASSED

kafka.server.ReplicaAlterLogDirsThreadTest > 
shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset STARTED

kafka.server.ReplicaAlterLogDirsThreadTest > 
shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset PASSED

kafka.server.ReplicaAlterLogDirsThreadTest > shouldTruncateToReplicaOffset 
STARTED

kafka.server.ReplicaAlterLogDirsThreadTest > shouldTruncateToReplicaOffset 
PASSED

kafka.server.ReplicaAlterLogDirsThreadTest > shouldFetchOneReplicaAtATime 
STARTED

kafka.server.ReplicaAlterLogDirsThreadTest > shouldFetchOneReplicaAtATime PASSED

kafka.server.ReplicaAlterLogDirsThreadTest > 
shouldPollIndefinitelyIfReplicaNotAvailable STARTED

kafka.server.ReplicaAlterLogDirsThreadTest > 
shouldPollIndefinitelyIfReplicaNotAvailable PASSED

kafka.server.ReplicaAlterLogDirsThreadTest > 
shouldTruncateToEndOffsetOfLargestCommonEpoch STARTED

kafka.server.ReplicaAlterLogDirsThreadTest > 
shouldTruncateToEndOffsetOfLargestCommonEpoch PASSED

kafka.server.ReplicaAlterLogDirsThreadTest > 
fetchEpochsFromLeaderShouldHandleExceptionFromGetLocalReplica STARTED

kafka.server.ReplicaAlterLogDirsThreadTest > 
fetchEpochsFromLeaderShouldHandleExceptionFromGetLocalReplica PASSED

kafka.server.ReplicaAlterLogDirsThreadTest > 
shouldFetchNonDelayedAndNonTruncatingReplicas STARTED

kafka.server.ReplicaAlterLogDirsThreadTest > 
shouldFetchNonDelayedAndNonTruncatingReplicas PASSED

kafka.server.LogRecoveryTest > testHWCheckpointNoFailuresMultipleLogSegments 
STARTED

kafka.server.LogRecoveryTest > testHWCheckpointNoFailuresMultipleLogSegments 
PASSED

kafka.server.LogRecoveryTest > testHWCheckpointWithFailuresMultipleLogSegments 
STARTED

kafka.server.LogRecoveryTest > testHWCheckpointWithFailuresMultipleLogSegments 
PASSED

kafka.server.LogRecoveryTest > testHWCheckpointNoFailuresSingleLogSegment 
STARTED

kafka.server.LogRecoveryTest > testHWCheckpointNoFailuresSingleLogSegment PASSED

kafka.server.LogRecoveryTest > testHWCheckpointWithFailuresSingleLogSegment 
STARTED

kafka.server.LogRecoveryTest > testHWCheckpointWithFailuresSingleLogSegment 
PASSED

kafka.server.HighwatermarkPersistenceTest > 
testHighWatermarkPersistenceMultiplePartitions STARTED

kafka.server.HighwatermarkPersistenceTest > 
testHighWatermarkPersistenceMultiplePartitions PASSED

kafka.server.HighwatermarkPersistenceTest > 
testHighWatermarkPersistenceSinglePartition STARTED

kafka.server.HighwatermarkPersistenceTest > 
testHighWatermarkPersistenceSinglePartition PASSED

kafka.server.DynamicConfigTest > shouldFailFollowerConfigsWithInvalidValues 
STARTED

kafka.server.DynamicConfigTest > shouldFailFollowerConfigsWithInvalidValues 
PASSED

kafka.server.DynamicConfigTest > shouldFailWhenChangingUserUnknownConfig STARTED

kafka.server.DynamicConfigTest > shouldFailWhenChangingUserUnknownConfig PASSED

kafka.server.DynamicConfigT

[jira] [Created] (KAFKA-7807) Special Characters in Topic Name Appear to Break Replication

2019-01-09 Thread Joseph Niemiec (JIRA)
Joseph Niemiec created KAFKA-7807:
-

 Summary: Special Characters in Topic Name Appear to Break 
Replication
 Key: KAFKA-7807
 URL: https://issues.apache.org/jira/browse/KAFKA-7807
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.1
Reporter: Joseph Niemiec


We recently upgraded a cluster from 1.0.0 to 1.1.1 and experienced an odd issue 
when we restarted the first broker. All ISR values quickly jumped to zero but 
no replication would take place to the broker, waiting over 30 minutes nothing 
occurred. After some investigation we believe it was due to a un-used topic 
having been created with special characters. Once we deleted this topic and 
restarted the broker everything was fine. We did not replicate this issue to 
conform it is why replication resumed but the timing as near perfect for when 
the problem topic was deleted and replication resumed.  

 

Topic name (the quotes are part of the name)

```

‘service_prod_premium_out’

```

Error Message we saw that lead us to decide to delete this topic. 
```

[2019-01-09 11:14:38,852] ERROR [KafkaApi-3] Error when handling request 
//Removed Partition and Replica Data
{color:#FF}java.nio.file.InvalidPathException: Malformed input or input 
contains unmappable characters: 
/grid/7/kafka-logs/?service_prod_premium_out?-4{color}
 at sun.nio.fs.UnixPath.encode(UnixPath.java:147)
 at sun.nio.fs.UnixPath.(UnixPath.java:71)
 at sun.nio.fs.UnixFileSystem.getPath(UnixFileSystem.java:281)
 at java.io.File.toPath(File.java:2234)
 at kafka.log.LogManager.$anonfun$getOrCreateLog$1(LogManager.scala:676)
 at scala.Option.getOrElse(Option.scala:121)
 at kafka.log.LogManager.getOrCreateLog(LogManager.scala:646)
 at kafka.cluster.Partition.$anonfun$getOrCreateReplica$1(Partition.scala:178)
 at kafka.utils.Pool.getAndMaybePut(Pool.scala:65)
 at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:174)
 at kafka.cluster.Partition.$anonfun$makeFollower$3(Partition.scala:316)
 at kafka.cluster.Partition.$anonfun$makeFollower$3$adapted(Partition.scala:316)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at kafka.cluster.Partition.$anonfun$makeFollower$1(Partition.scala:316)
 at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
 at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
 at kafka.cluster.Partition.makeFollower(Partition.scala:309)
 at 
kafka.server.ReplicaManager.$anonfun$makeFollowers$4(ReplicaManager.scala:1242)
 at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138)
 at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:236)
 at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:229)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:138)
 at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:1236)
 at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1082)
 at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:183)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:108)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
 at java.lang.Thread.run(Thread.java:748)

 ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7806) Windowed Aggregations should wrap default key serde if none is specified

2019-01-09 Thread John Roesler (JIRA)
John Roesler created KAFKA-7806:
---

 Summary: Windowed Aggregations should wrap default key serde if 
none is specified
 Key: KAFKA-7806
 URL: https://issues.apache.org/jira/browse/KAFKA-7806
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler


In Streams, windowing a stream by either time or session windows causes the 
stream's keys to be transformed from `K` to `Windowed`.

Since this is a well defined transition, it's not necessary for developers to 
explicitly provide a `Serde>`. For convenience, Streams, which 
already knows the key serde (`Serde`) automatically wraps it in case it's 
needed by downstream operators.

However, this automatic wrapping only takes place if the key serde has been 
explicitly provided in the topology. If the topology relies on the 
`default.key.serde` configuration, no wrapping takes place, and downstream 
operators will encounter a ClassCastException trying to cast a `Windowed` (the 
windowed key) to whatever type the default serde handles (which is the key 
wrapped inside the windowed key).

Specifically, they key serde forwarding logic is:

in `org.apache.kafka.streams.kstream.internals.TimeWindowedKStreamImpl`:

`materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : null`

and in `org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImpl`:

`materializedInternal.keySerde() != null ? new 
WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null`

 

This pattern of not "solidifying" the default key serde is common in Streams. 
Not all operators need a serde, and the default serde may not be applicable to 
all operators. So, it would be a mistake to arbitrary operators to grab the 
default serde and pass it downstream as if it had been explicitly set.

 

However, in this case specifically, all windowed aggregations are stateful, so 
if we don't have an explicit key serde at this point, we know that we have used 
the default serde in the window store. If the default serde were incorrect, an 
exception would be thrown by the windowed aggregation itself. So it actually is 
safe to wrap the default serde in a windowed serde and pass it downstream, 
which would result in a better development experience.

 

Unfortunately, the default serde is set via config, but the windowed serde 
wrapping happens during DSL building, when the config is not generally 
available. Therefore, we would need a special windowed serde wrapper that 
signals that it wraps the default serde, which would be fully resolved during 
operators' init call.

For example, something of this nature:

`materializedInternal.keySerde() != null ? new 
FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.size()) : 
FullTimeWindowedSerde.wrapDefault(windows.size())`

etc.

 

Complicating the situation slightly, all the windowed serializers and 
deserializers will resolve a runtime inner class using 
`default.windowed.key.serde.inner` if given a null inner serde to wrap. 
However, at this point in the topology build, we do know that the windowed 
aggregation has specifically used the `default.key.serde`, not the 
`default.windowed.key.serde.inner` to persist its state to the window store, 
therefore, it should be correct to wrap the default key serde specifically and 
not use the `default.windowed.key.serde.inner`.

 

In addition to fixing this for TimeWindowed and SessionWindowed streams, we 
need to have good test coverage of the new code. There is clearly a blind spot 
in the tests, or we would have noticed this sooner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-trunk-jdk11 #194

2019-01-09 Thread Apache Jenkins Server
See 




Re: [VOTE] [REMINDER] KIP-383 Pluggable interface for SSL Factory

2019-01-09 Thread Harsha
HI All,
We are looking forward to this KIP. Appreciate if others can take a 
look at the kip and
vote on this thread.

Thanks
Harsha

On Fri, Dec 21, 2018, at 4:41 AM, Damian Guy wrote:
> must be my gmail playing up. This appears to be the DISCUSS thread to me...
> e
> On Thu, 20 Dec 2018 at 18:42, Harsha  wrote:
> 
> > Damian,
> >This is the VOTE thread. There is a DISCUSS thread which
> > concluded in it.
> >
> > -Harsha
> >
> >
> > On Wed, Dec 19, 2018, at 5:04 AM, Pellerin, Clement wrote:
> > > I did that and nobody came.
> > > https://lists.apache.org/list.html?dev@kafka.apache.org:lte=1M:kip-383
> > > I don't understand why this feature is not more popular.
> > > It's the solution to one Jira and a work-around for a handful more Jiras.
> > >
> > > -Original Message-
> > > From: Damian Guy [mailto:damian@gmail.com]
> > > Sent: Wednesday, December 19, 2018 7:38 AM
> > > To: dev
> > > Subject: Re: [VOTE] [REMINDER] KIP-383 Pluggable interface for SSL
> > Factory
> > >
> > > Hi Clement,
> > >
> > > You should start a separate thread for the vote, i.e., one with a subject
> > > of [VOTE] KIP-383 ...
> > >
> > > Looks like you haven't done this?
> >


Delayed messages with skewed data when subscribing to many partitions

2019-01-09 Thread Sönke Liebau
Hi all,

we've just had a case where we suspect that messages get delayed from
being consumed under certain circumstances. I don't necessarily think
this is a bug, hence have not opened a jira yet but wanted to discuss
here - there's probably a best practice that I just don't know about.

The scenario is having one consumer that is subscribed to a large
number of partitions, some of which are very busy and some of which
only receive messages sporadically. When the consumer now sends a
fetchrequest for all subscribed partitions the broker starts filling
these partition by partition while honoring two parameters:

max.partition.fetch.bytes - controls the maximum size of the data that
is returned for one individual partition - default: 1 * 1024 * 1024 =
1048576 bytes
fetch.max.bytes - controls the overall maximum size of data that is
returned for the entire fetchrequest - default: 50 * 1024 * 1024 =
52428800 bytes

So by default a fetchresponse can contain data from a maximum of 50
partitions, which creates the possibility of "freezing out" partitions
if there are a lot of busy partitions in the subscriptions.
I've created a small test for this to illustrate my concern:

Topics:
000 - 1 partition - 1 message
aaa - 100 partitions - 10 Mill. messages
bbb - 1000 partitions - 50 Mill. messages
mmm - 1 partition - 1 message
zzz - 100 partitions - 10 Mill messages

When I consume from these with default settings and simply print the
time I first receive a message from a topic I get the following:
Got first record from topic aaa after 747 ms
Got first record from topic bbb after 2764 ms
Got first record from topic zzz after 15068 ms
Got first record from topic 000 after 16588 ms
Got first record from topic mmm after 16588 ms

So as we can see the topics with only one partition get stuck behind
the larger topics with data to be read. I am unsure in what order the
broker iterates over the partitions, but I've always seen the same
general order in the output, so there seems to be some factor
influencing this.
One potential fix that I identified was to reduce the
max.partition.fetch.bytes parameter, so that more partitions can be
included in a fetchresponse. If I rerun the test with a value of 1024
I get:

Got first record from topic aaa after 5446 ms
Got first record from topic bbb after 5469 ms
Got first record from topic zzz after 5744 ms
Got first record from topic mmm after 5762 ms
Got first record from topic 000 after 5762 ms

Which looks much better, but I have doubts whether this is the actual
solution as this could lead to an increase in the number of fetch
requests that are being sent, when only a few partitions have new
data:
5 Partitions with 10mb of new data each would fit in 10 requests with
default settings, but need 10240 with my adjusted settings.

This topic is currently also being discussed in the thread on KIP-349
[1] but consensus seems to be that there is no real need for a feature
like this.

Are there common patterns to get around this? The obvious solution
would be scaling the load across more consumers of course, either by
adding them to the consumer group or by splitting the topics over
consumers, but that sort of just makes it a question of scale until it
may happen again.

Would it potentially be worthwhile looking into code changes to
improve handling for these edge cases?
Keeping track of the last time partitions returned data for a consumer
group and prioritizing "oldest" partitions for example. This would
need memory on the broker though which might turn out to be quite a
lot since it would scale with partition count and consumer groups.
Alternatively some sort of feedback to the consumer could be added
about partitions that were not checked due to the limits, but that
would need a wire protocol change.
Perhaps a little consumer side logic that starts fragmenting fetch
requests if it notices that responses always have data from the
maximum number of partitions.

Best regards,
Sönke

[1] 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics


[jira] [Created] (KAFKA-7805) Use --bootstrap-server option in ducktape tests where applicable

2019-01-09 Thread Viktor Somogyi (JIRA)
Viktor Somogyi created KAFKA-7805:
-

 Summary: Use --bootstrap-server option in ducktape tests where 
applicable
 Key: KAFKA-7805
 URL: https://issues.apache.org/jira/browse/KAFKA-7805
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Reporter: Viktor Somogyi


KIP-377 introduces the {{--bootstrap-server}} option and deprecates the 
{{--zookeeper}} option in {{kafka-topics.sh}}. I'd be nice to use the new 
option in the ducktape tests to gain higher test coverage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7804) Update the docs for KIP-377

2019-01-09 Thread Viktor Somogyi (JIRA)
Viktor Somogyi created KAFKA-7804:
-

 Summary: Update the docs for KIP-377
 Key: KAFKA-7804
 URL: https://issues.apache.org/jira/browse/KAFKA-7804
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Reporter: Viktor Somogyi
Assignee: Viktor Somogyi


KIP-377 introduced the {{--bootstrap-server}} option to the {{kafka-topics.sh}} 
command. The documentation (examples) should be updated accordingly and a 
release note should be added,



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSSION] KIP-412: Extend Admin API to support dynamic application log levels

2019-01-09 Thread Stanislav Kozlovski
Hey Ryanne, thanks for taking a look at the KIP!

I think that it is useful to specify the distinction between a standard
Kafka config and the log level configs. The log level can be looked at as a
separate resource as it does not change the behavior of the Kafka broker in
any way.
In terms of practical benefits, separating the two eases this KIP's
implementation and user's implementation of AlterConfigPolicy (e.g deny all
requests that try to alter log level) significantly. We would also be able
to introduce a

On Wed, Jan 9, 2019 at 1:48 AM Ryanne Dolan  wrote:

> > To differentiate between the normal Kafka config settings and the
> application's log level settings, we will introduce a new resource type -
> BROKER_LOGGERS
>
> Stanislav, can you explain why log level wouldn't be a "normal Kafka config
> setting"?
>
> Ryanne
>
> On Tue, Jan 8, 2019, 4:26 PM Stanislav Kozlovski  wrote:
>
> > Hey there everybody,
> >
> > I'd like to start a discussion about KIP-412. Please take a look at the
> KIP
> > if you can, I would appreciate any feedback :)
> >
> > KIP: KIP-412
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-412%3A+Extend+Admin+API+to+support+dynamic+application+log+levels
> > >
> > JIRA: KAFKA-7800 
> >
> > --
> > Best,
> > Stanislav
> >
>


-- 
Best,
Stanislav


Re: [Discuss] KIP-389: Enforce group.max.size to cap member metadata growth

2019-01-09 Thread Stanislav Kozlovski
Hey everybody,

I ran a quick benchmark and took some heap dumps to gauge how much memory
each consumer in a group is using, all done locally.
The setup was the following: 10 topics with 10 partitions each (100
partitions total) and one consumer group with 10 members, then expanded to
20 members.
Here are some notes of my findings in a public Google doc:
https://docs.google.com/document/d/1Z4aY5qg8lU2uNXzdgp_30_oJ9_I9xNelPko6GIQYXYk/edit?usp=sharing


On Mon, Jan 7, 2019 at 10:51 PM Boyang Chen  wrote:

> Hey Stanislav,
>
> I think the time taken to rebalance is not linearly correlated with number
> of consumers with our application. As for our current and future use cases,
> the main concern for Pinterest is still on the broker memory not CPU,
> because crashing server by one application could have cascading effect on
> all jobs.
> Do you want to drive a more detailed formula on how to compute the memory
> consumption against number of consumers within the group?
>
> In the meantime, I'm pretty buying in the motivation of this KIP, so I
> think the follow-up work is just refinement to make the new config easy to
> use. We should be good
> to vote IMO.
>
> Best,
> Boyang
> 
> From: Stanislav Kozlovski 
> Sent: Monday, January 7, 2019 4:21 PM
> To: dev@kafka.apache.org
> Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> metadata growth
>
> Hey there,
>
> Per Gwen's comments, I slightly reworked the motivation section. Let me
> know if it's any better now
>
> I completely agree that it would be best if we were to add a recommended
> number to a typical consumer group size. There is a problem that timing the
> CPU usage and rebalance times of consumer groups is tricky. We can update
> the KIP with memory guidelines (e.g 1 consumer in a group uses X memory,
> therefore 100 use Y).
> I fear that the most useful recommendations though would be knowing the CPU
> impact of large consumer groups and the rebalance times. That is,
> unfortunately, tricky to test and measure.
>
> @Boyang, you had mentioned some numbers used in Pinterest. If available to
> you, would you be comfortable sharing the number of consumers you are using
> in a group and maybe the potential time it takes to rebalance it?
>
> I'd appreciate any anecdotes regarding consumer group sizes from the
> community
>
> Best,
> Stanislav
>
> On Thu, Jan 3, 2019 at 1:59 AM Boyang Chen  wrote:
>
> > Thanks Gwen for the suggestion! +1 on the guidance of defining
> > group.max.size. I guess a sample formula would be:
> > 2 * (# of brokers * average metadata cache size * 80%) / (# of consumer
> > groups * size of a single member metadata)
> >
> > if we assumed non-skewed partition assignment and pretty fair consumer
> > group consumption. The "2" is the 95 percentile of normal distribution
> and
> > 80% is just to buffer some memory capacity which are both open to
> > discussion. This config should be useful for Kafka platform team to make
> > sure one extreme large consumer group won't bring down the whole cluster.
> >
> > What do you think?
> >
> > Best,
> > Boyang
> >
> > 
> > From: Gwen Shapira 
> > Sent: Thursday, January 3, 2019 2:59 AM
> > To: dev
> > Subject: Re: [Discuss] KIP-389: Enforce group.max.size to cap member
> > metadata growth
> >
> > Sorry for joining the fun late, but I think the problem we are solving
> > evolved a bit in the thread, and I'd like to have better understanding
> > of the problem before voting :)
> >
> > Both KIP and discussion assert that large groups are a problem, but
> > they are kinda inconsistent regarding why they are a problem and whose
> > problem they are...
> > 1. The KIP itself states that the main issue with large groups are
> > long rebalance times. Per my understanding, this is mostly a problem
> > for the application that consumes data, but not really a problem for
> > the brokers themselves, so broker admins probably don't and shouldn't
> > care about it. Also, my understanding is that this is a problem for
> > consumer groups, but not necessarily a problem for other group types.
> > 2. The discussion highlights the issue of "run away" groups that
> > essentially create tons of members needlessly and use up lots of
> > broker memory. This is something the broker admins will care about a
> > lot. And is also a problem for every group that uses coordinators and
> > not just consumers. And since the memory in question is the metadata
> > cache, it probably has the largest impact on Kafka Streams
> > applications, since they have lots of metadata.
> >
> > The solution proposed makes the most sense in the context of #2, so
> > perhaps we should update the motivation section of the KIP to reflect
> > that.
> >
> > The reason I'm probing here is that in my opinion we have to give our
> > users some guidelines on what a reasonable limit is (otherwise, how
> > will they know?). Calculating the impact of group-size on rebalance
> > t

Build failed in Jenkins: kafka-2.1-jdk8 #99

2019-01-09 Thread Apache Jenkins Server
See 


Changes:

[jason] MINOR: Simplify handling of KafkaProducer serializer overrides

[jason] KAFKA-6833; Producer should await metadata for unknown partitions

--
[...truncated 916.14 KB...]
kafka.server.KafkaConfigTest > testInvalidCompressionType STARTED

kafka.server.KafkaConfigTest > testInvalidCompressionType PASSED

kafka.server.KafkaConfigTest > testAdvertiseHostNameDefault STARTED

kafka.server.KafkaConfigTest > testAdvertiseHostNameDefault PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeMinutesProvided STARTED

kafka.server.KafkaConfigTest > testLogRetentionTimeMinutesProvided PASSED

kafka.server.KafkaConfigTest > testValidCompressionType STARTED

kafka.server.KafkaConfigTest > testValidCompressionType PASSED

kafka.server.KafkaConfigTest > testUncleanElectionInvalid STARTED

kafka.server.KafkaConfigTest > testUncleanElectionInvalid PASSED

kafka.server.KafkaConfigTest > testListenerNamesWithAdvertisedListenerUnset 
STARTED

kafka.server.KafkaConfigTest > testListenerNamesWithAdvertisedListenerUnset 
PASSED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndMsProvided 
STARTED

kafka.server.KafkaConfigTest > testLogRetentionTimeBothMinutesAndMsProvided 
PASSED

kafka.server.KafkaConfigTest > testLogRollTimeMsProvided STARTED

kafka.server.KafkaConfigTest > testLogRollTimeMsProvided PASSED

kafka.server.KafkaConfigTest > testUncleanLeaderElectionDefault STARTED

kafka.server.KafkaConfigTest > testUncleanLeaderElectionDefault PASSED

kafka.server.KafkaConfigTest > testInvalidAdvertisedListenersProtocol STARTED

kafka.server.KafkaConfigTest > testInvalidAdvertisedListenersProtocol PASSED

kafka.server.KafkaConfigTest > testUncleanElectionEnabled STARTED

kafka.server.KafkaConfigTest > testUncleanElectionEnabled PASSED

kafka.server.KafkaConfigTest > testInterBrokerVersionMessageFormatCompatibility 
STARTED

kafka.server.KafkaConfigTest > testInterBrokerVersionMessageFormatCompatibility 
PASSED

kafka.server.KafkaConfigTest > testAdvertisePortDefault STARTED

kafka.server.KafkaConfigTest > testAdvertisePortDefault PASSED

kafka.server.KafkaConfigTest > testVersionConfiguration STARTED

kafka.server.KafkaConfigTest > testVersionConfiguration PASSED

kafka.server.KafkaConfigTest > testEqualAdvertisedListenersProtocol STARTED

kafka.server.KafkaConfigTest > testEqualAdvertisedListenersProtocol PASSED

kafka.server.ListOffsetsRequestTest > testListOffsetsErrorCodes STARTED

kafka.server.ListOffsetsRequestTest > testListOffsetsErrorCodes PASSED

kafka.server.ListOffsetsRequestTest > testCurrentEpochValidation STARTED

kafka.server.ListOffsetsRequestTest > testCurrentEpochValidation PASSED

kafka.server.CreateTopicsRequestTest > testValidCreateTopicsRequests STARTED

kafka.server.CreateTopicsRequestTest > testValidCreateTopicsRequests PASSED

kafka.server.CreateTopicsRequestTest > testErrorCreateTopicsRequests STARTED

kafka.server.CreateTopicsRequestTest > testErrorCreateTopicsRequests PASSED

kafka.server.CreateTopicsRequestTest > testInvalidCreateTopicsRequests STARTED

kafka.server.CreateTopicsRequestTest > testInvalidCreateTopicsRequests PASSED

kafka.server.CreateTopicsRequestTest > testNotController STARTED

kafka.server.CreateTopicsRequestTest > testNotController PASSED

kafka.server.CreateTopicsRequestWithPolicyTest > testValidCreateTopicsRequests 
STARTED

kafka.server.CreateTopicsRequestWithPolicyTest > testValidCreateTopicsRequests 
PASSED

kafka.server.CreateTopicsRequestWithPolicyTest > testErrorCreateTopicsRequests 
STARTED

kafka.server.CreateTopicsRequestWithPolicyTest > testErrorCreateTopicsRequests 
PASSED

kafka.server.FetchRequestDownConversionConfigTest > 
testV1FetchWithDownConversionDisabled STARTED

kafka.server.FetchRequestDownConversionConfigTest > 
testV1FetchWithDownConversionDisabled PASSED

kafka.server.FetchRequestDownConversionConfigTest > testV1FetchFromReplica 
STARTED

kafka.server.FetchRequestDownConversionConfigTest > testV1FetchFromReplica 
PASSED

kafka.server.FetchRequestDownConversionConfigTest > 
testLatestFetchWithDownConversionDisabled STARTED

kafka.server.FetchRequestDownConversionConfigTest > 
testLatestFetchWithDownConversionDisabled PASSED

kafka.server.FetchRequestDownConversionConfigTest > 
testV1FetchWithTopicLevelOverrides STARTED

kafka.server.FetchRequestDownConversionConfigTest > 
testV1FetchWithTopicLevelOverrides PASSED

kafka.server.SaslApiVersionsRequestTest > 
testApiVersionsRequestWithUnsupportedVersion STARTED

kafka.server.SaslApiVersionsRequestTest > 
testApiVersionsRequestWithUnsupportedVersion PASSED

kafka.server.SaslApiVersionsRequestTest > 
testApiVersionsRequestBeforeSaslHandshakeRequest STARTED

kafka.server.SaslApiVersionsRequestTest > 
testApiVersionsRequestBeforeSaslHandshakeRequest PASSED

kafka.server.SaslApiVersionsRequestTest > 
testApiVersionsRequestAfterSaslHandshakeRequest START

Build failed in Jenkins: kafka-trunk-jdk8 #3296

2019-01-09 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-7799; Fix flaky test RestServerTest.testCORSEnabled (#6106)

--
[...truncated 4.50 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDiffere

Jenkins build is back to normal : kafka-2.0-jdk8 #211

2019-01-09 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-201: Rationalising Policy interfaces

2019-01-09 Thread Tom Bentley
Hi Anna and Mickael,

Anna, did you have any comments about the points I made?

Mickael, we really need the vote to be passed before there's even any work
to do. With the exception of Ismael, the KIP didn't seem to get the
attention of any of the other committers.

Kind regards,

Tom

On Thu, 13 Dec 2018 at 18:11, Tom Bentley  wrote:

> Hi Anna,
>
> Firstly, let me apologise again about having missed your previous emails
> about this.
>
> Thank you for the feedback. You raise some valid points about ambiguity.
> The problem with pulling the metadata into CreateTopicRequest and
> AlterTopicRequest is that you lose the benefit of being able to eaily write
> a common policy across creation and alter cases. For example, with the
> proposed design the policy maker could write code like this (forgive my
> pseudo-Java)
>
> public void validateCreateTopic(requestMetadata, ...) {
> commonPolicy(requestMetadata.requestedState());
>   }
>
>   public void validateAlterTopic(requestMetadata, ...) {
> commonPolicy(requestMetadata.requestedState());
>   }
>
>   private void commonPolicy(RequestedTopicState requestedState) {
> // ...
>   }
>
> I think that's an important feature of the API because (I think) very
> often the policy maker is interested in defining the universe of prohibited
> configurations without really caring about whether the request is a create
> or an alter. Having a single RequestedTopicState for both create and
> alter means they can do that trivially in one place. Having different
> methods in the two Request classes prevents this and forces the policy
> maker to pick apart the different requestState objects before calling any
> common method(s).
>
> I think my intention at the time (and it's many months ago now, so I might
> not have remembered fully) was that RequestedTopicState would basically
> represent what the topic would look like after the requested changes were
> applied (I accept this isn't how it's Javadoc'd in the KIP), rather than
> representing the request itself. Thus if the request changed the assignment
> of some of the partitions and the policy maker was interested in precisely
> which partitions would be changed, and how, they would indeed have to
> compute that for themselves by looking up the current topic state from the
> cluster state and seeing how they differed. Indeed they'd have to do this
> diff even to figure out that the user was requesting a change to the topic
> assigned (or similarly for topic config, etc). To me this is acceptable
> because I think most people writing such policies are just interested in
> defining what is not allowed, so giving them a representation of the
> proposed topic state which they can readily check against is the most
> direct API. In this interpretation generatedReplicaAssignment() would
> just be some extra metadata annotating whether any difference between the
> current and proposed states was directly from the user, or generated on the
> broker. You're right that it's ambiguous when the request didn't actually
> change the assignment but I didn't envisage policy makers using it except
> when the assignments differed anyway. To me it would be acceptable to
> Javadoc this.
>
> Given this interpretation of RequestedTopicState as "what the topic would
> look like after the requested changes were applied" can you see any other
> problems with the proposal? Or do you have use cases where the policy maker
> is more interested in what the request is changing?
>
> Kind regards,
>
> Tom
>
> On Fri, 7 Dec 2018 at 08:41, Tom Bentley  wrote:
>
>> Hi Anna and Mickael,
>>
>> Sorry for remaining silent on this for so long. I should have time to
>> look at this again next week.
>>
>> Kind regards,
>>
>> Tom
>>
>> On Mon, 3 Dec 2018 at 10:11, Mickael Maison 
>> wrote:
>>
>>> Hi Tom,
>>>
>>> This is a very interesting KIP. If you are not going to continue
>>> working on it, would it be ok for us to grab it and complete it?
>>> Thanks
>>> On Thu, Jun 14, 2018 at 7:06 PM Anna Povzner  wrote:
>>> >
>>> > Hi Tom,
>>> >
>>> > Just wanted to check what you think about the comments I made in my
>>> last
>>> > message. I think this KIP is a big improvement to our current policy
>>> > interfaces, and really hope we can get this KIP in.
>>> >
>>> > Thanks,
>>> > Anna
>>> >
>>> > On Thu, May 31, 2018 at 3:29 PM, Anna Povzner 
>>> wrote:
>>> >
>>> > > Hi Tom,
>>> > >
>>> > >
>>> > > Thanks for the KIP. I am aware that the voting thread was started,
>>> but
>>> > > wanted to discuss couple of concerns here first.
>>> > >
>>> > >
>>> > > I think the coupling of
>>> RequestedTopicState#generatedReplicaAssignment()
>>> > > and TopicState#replicasAssignments() does not work well in case
>>> where the
>>> > > request deals only with a subset of partitions (e.g., add
>>> partitions) or no
>>> > > assignment at all (alter topic config). In particular:
>>> > >
>>> > > 1) Alter topic config use case: There is no replica assignment in the
>>> > > request, and

Build failed in Jenkins: kafka-trunk-jdk11 #193

2019-01-09 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-7799; Fix flaky test RestServerTest.testCORSEnabled (#6106)

--
[...truncated 2.25 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfTimestampIsDifferentForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apa

[jira] [Created] (KAFKA-7803) Streams internal topics config is not updated when the code is changed

2019-01-09 Thread Tim Van Laer (JIRA)
Tim Van Laer created KAFKA-7803:
---

 Summary: Streams internal topics config is not updated when the 
code is changed
 Key: KAFKA-7803
 URL: https://issues.apache.org/jira/browse/KAFKA-7803
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Tim Van Laer


Considering the following state store definition:
{code:java}
ImmutableMap changelogTopicConfig = new 
ImmutableMap.Builder()
.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(100 * 1024 * 1024))
.build();

builder.addStateStore(

Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store-example"), 
Serdes.String(), Serdes.String())
.withLoggingEnabled(changelogTopicConfig)
);{code}
The configuration for a changelog topic (segment size, max message size...) is 
used when Kafka Streams create the internal topic (See 
[InternalTopicManager|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java]).
 If I later decide to increase the segment size, I would update the value in 
the code. However Kafka Streams currently won't apply this code change to the 
internal topic config. This causes a confusing state where the code is 
different from the actual runtime.  

It would be convenient if Kafka Streams could reflect those changes to the 
internal topic by updating the topic configuration. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6833) KafkaProducer throws "Invalid partition given with record" exception

2019-01-09 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6833.

   Resolution: Fixed
Fix Version/s: 2.1.1
   2.2.0

> KafkaProducer throws "Invalid partition given with record" exception
> 
>
> Key: KAFKA-6833
> URL: https://issues.apache.org/jira/browse/KAFKA-6833
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Arjun Satish
>Assignee: Bob Barrett
>Priority: Minor
> Fix For: 2.2.0, 2.1.1
>
>
> Currently, when creating topics via ZooKeeper, there is a small but definite 
> delay between creating the nodes in ZK, and having the topics created in the 
> brokers. the KafkaProducer maintains a metadata cache about topics which get 
> updated after the broker metadata is updated. If an application adds 
> partitions to a topic, and immediately tries to produce records to a new 
> partition, a KafkaException is throw with a message similar to the following:
> {code:java}
> Caused by: org.apache.kafka.common.KafkaException: Invalid partition given 
> with record: 12 is not in the range [0...1).
> {code}
> In this case, since the application has context that it created the topics, 
> it might be worthwhile to consider if a more specific exception can be thrown 
> instead of KafkaException. For example:
> {code:java}
> public class PartitionNotFoundException extends KafkaException {...}{code}
> This could allow the application to be able to interpret such an error, and 
> act accordingly.
> EDIT: Correct "create topics" to "adds partitions to a topic".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7799) Fix flaky test RestServerTest.testCORSEnabled

2019-01-09 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7799.

Resolution: Fixed

> Fix flaky test RestServerTest.testCORSEnabled
> -
>
> Key: KAFKA-7799
> URL: https://issues.apache.org/jira/browse/KAFKA-7799
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Starting to see this failure quite a lot, locally and on jenkins:
> {code}
> org.apache.kafka.connect.runtime.rest.RestServerTest.testCORSEnabled
> Failing for the past 7 builds (Since Failed#18600 )
> Took 0.7 sec.
> Error Message
> java.lang.AssertionError: expected: but was:
> Stacktrace
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.kafka.connect.runtime.rest.RestServerTest.checkCORSRequest(RestServerTest.java:221)
>   at 
> org.apache.kafka.connect.runtime.rest.RestServerTest.testCORSEnabled(RestServerTest.java:84)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:326)
>   at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:89)
>   at 
> org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:97)
> {code}
> If it helps, I see an uncaught exception in the stdout:
> {code}
> [2019-01-08 19:35:23,664] ERROR Uncaught exception in REST call to 
> /connector-plugins/FileStreamSource/validate 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
> javax.ws.rs.NotFoundException: HTTP 404 Not Found
>   at 
> org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:274)
>   at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
>   at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
>   at 
> org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
>   at 
> org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
>   at 
> org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)