[jira] [Created] (KAFKA-7786) Fast update of leader epoch may stall partition fetching due to FENCED_LEADER_EPOCH

2019-01-03 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-7786:
---

 Summary: Fast update of leader epoch may stall partition fetching 
due to FENCED_LEADER_EPOCH
 Key: KAFKA-7786
 URL: https://issues.apache.org/jira/browse/KAFKA-7786
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.1.0
Reporter: Anna Povzner


KIP-320/KAFKA-7395 Added FENCED_LEADER_EPOCH error response to a 
OffsetsForLeaderEpoch request if the epoch in the request is lower than the 
broker's leader epoch. ReplicaFetcherThread builds a OffsetsForLeaderEpoch 
request under _partitionMapLock_, sends the request outside the lock, and then 
processes the response under _partitionMapLock_. The broker may receive 
LeaderAndIsr with the same leader but with the next leader epoch, remove and 
add partition to the fetcher thread (with partition state reflecting the 
updated leader epoch) – all while the OffsetsForLeaderEpoch request (with the 
old leader epoch) is still outstanding/ waiting for the lock to process the 
OffsetsForLeaderEpoch response. As a result, partition gets removed from 
partitionStates and this broker will not fetch for this partition until the 
next LeaderAndIsr which may take a while. We will see log message like this:

[2018-12-23 07:23:04,802] INFO [ReplicaFetcher replicaId=3, leaderId=2, 
fetcherId=0] Partition test_topic-17 has an older epoch (7) than the current 
leader. Will await the new LeaderAndIsr state before resuming fetching. 
(kafka.server.ReplicaFetcherThread)

We saw this happen with 
kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=True.reassign_from_offset_zero=True.
 This test does partition re-assignment while bouncing 2 out of 4 total 
brokers. When the failure happen, each bounced broker was also a controller. 
Because of re-assignment, the controller updates leader epoch without updating 
the leader on controller change or on broker startup, so we see several leader 
epoch changes without the leader change, which increases the likelihood of the 
race condition described above.

Here is exact events that happen in this test (around the failure):

We have 4 brokers Brokers 1, 2, 3, 4. Partition re-assignment is started for 
test_topic-17 [2, 4, 1]  —> [3, 1, 2]. At time t0, leader of test_topic-17 is 
broker 2.
 # clean shutdown of broker 3, which is also a controller
 # broker 4 becomes controller, continues re-assignment and updates leader 
epoch for test_topic-17 to 6 (with same leader)
 # broker 2 (leader of test_topic-17) receives new leader epoch: “test_topic-17 
starts at Leader Epoch 6 from offset 1388. Previous Leader Epoch was: 5”
 # broker 3 is started again after clean shutdown
 # controller sees broker 3 startup, and sends LeaderAndIsr(leader epoch 6) to 
broker 3
 # controller updates leader epoch to 7
 # broker 2 (leader of test_topic-17) receives LeaderAndIsr for leader epoch 7: 
“test_topic-17 starts at Leader Epoch 7 from offset 1974. Previous Leader Epoch 
was: 6”
 # broker 3 receives LeaderAndIsr for test_topic-17 and leader epoch 6 from 
controller: “Added fetcher to broker BrokerEndPoint(id=2) for leader epoch 6” 
and sends OffsetsForLeaderEpoch request to broker 2
 # broker 3 receives LeaderAndIsr for test_topic-17 and leader epoch 7 from 
controller; removes fetcher thread and adds fetcher thread + executes 
AbstractFetcherThread.addPartitions() which updates partition state with leader 
epoch 7
 # broker 3 receives FENCED_LEADER_EPOCH in response to 
OffsetsForLeaderEpoch(leader epoch 6), because the leader received LeaderAndIsr 
for leader epoch 7 before it got OffsetsForLeaderEpoch(leader epoch 6) from 
broker 3. As a result, it removes partition from partitionStates and it does 
not fetch until controller updates leader epoch and sends LeaderAndIsr for this 
partition to broker 3. The test fails, because re-assignment does not finish on 
time (due to broker 3 not fetching).

 

One way to address this is possibly add more state to PartitionFetchState. 
However, we may introduce other race condition. A cleaner way, I think, is to 
return leader epoch in the OffsetsForLeaderEpoch response with 
FENCED_LEADER_EPOCH error, and then ignore the error if partition state 
contains a higher leader epoch. The advantage is less state maintenance, but 
disadvantage is it requires bumping inter-broker protocol.
h1.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-2.1-jdk8 #94

2019-01-03 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka Streams

2019-01-03 Thread Richard Yu
Hi Boyang,

Interesting article. Although something crossed my mind. When skipping bad
records, we couldn't go back to them to process again to guarantee ordering
i.e (both exactly-once and at-least-once would not be supported, only
at-most-once). Also, in Kafka, when it comes to individually acking every
single record, the resulting latency is horrible (from what I heard). We
actually discussed something like this in
https://issues.apache.org/jira/browse/KAFKA-7432. It might give you some
insight since it is a related issue.

I hope this helps,
Richard




On Thu, Jan 3, 2019 at 7:29 PM Boyang Chen  wrote:

> Hey Richard,
>
> thanks for the explanation. Recently I read an interesting blog post<
> https://streaml.io/blog/pulsar-streaming-queuing> from Apache Pulsar
> (written long time ago), where they define the concept of individual ack
> which means we could skip records and leave certain records remain on the
> queue for late processing. This should be something similar to KIP-408
> which also shares some motivations for us to invest.
>
> Boyang
>
> 
> From: Richard Yu 
> Sent: Friday, January 4, 2019 5:42 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka
> Streams
>
> Hi all,
>
> Just bumping this KIP. Would be great if we got some discussion.
>
>
> On Sun, Dec 30, 2018 at 5:13 PM Richard Yu 
> wrote:
>
> > Hi all,
> >
> > I made some recent changes to the KIP. It should be more relevant with
> the
> > issue now (involves Processor API in detail).
> > It would be great if you could comment.
> >
> > Thanks,
> > Richard
> >
> > On Wed, Dec 26, 2018 at 10:01 PM Richard Yu 
> > wrote:
> >
> >> Hi all,
> >>
> >> Just changing the title of the KIP. Discovered it wasn't right.
> >> Thats about it. :)
> >>
> >> On Mon, Dec 24, 2018 at 7:57 PM Richard Yu 
> >> wrote:
> >>
> >>> Sorry, just making a correction.
> >>>
> >>> Even if we are processing records out of order, we will still have to
> >>> checkpoint offset ranges.
> >>> So it doesn't really change anything even if we are doing in-order
> >>> processing.
> >>>
> >>> Thinking this over, I'm leaning slightly towards maintaining the
> >>> ordering guarantee.
> >>> Although when implementing this change, there might be some kinks that
> >>> we have not thought about which could throw a monkey wrench into the
> works.
> >>>
> >>> But definitely worth trying out,
> >>> Richard
> >>>
> >>> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu  >
> >>> wrote:
> >>>
>  Hi Boyang,
> 
>  I could see where you are going with this. Well, I suppose I should
>  have added this to alternatives, but I might as well mention it now.
> 
>  It had crossed my mind that we consider returning in-order even if
>  there are multiple threads processing on the same thread. But for
> this to
>  happen, we must block for the offsets in-between which have not been
>  processed yet. For example, offsets 1-50 are being processed by
> thread1,
>  while the offsets 51 - 100 are being processed by thread2. We will
> have to
>  wait for thread1 to finish processing its offsets first before we
> return
>  the records processed by thread2. So in other words, once thread1 is
> done,
>  thread2's work up to that point will be returned in one go, but not
> before
>  that.
> 
>  I suppose this could work, but the client will have to wait some time
>  before the advantages of multithreaded processing can be seen (i.e.
> the
>  first thread has to finish processing its segment of the records first
>  before any others are returned to guarantee ordering). Another point I
>  would like to make is that the threads are *asynchronous. *So for us
>  to know when a thread is done processing a certain segment, we will
>  probably have a similar policy to how getMetadataAsync() works (i.e.
> have a
>  parent thread be notified of when the children threads are done).
>  [image: image.png]
>  Just pulling this from the KIP. But instead, we would apply this to
>  metadata segments instead of just a callback.
>  I don't know whether or not the tradeoffs are acceptable to the
> client.
>  Ordering could be guaranteed, but it would be hard to do. For
> example, if
>  there was a crash, we might lose track of which offsets numbers and
> ranges
>  we are processing for each child thread, so somehow we need to find a
> way
>  to checkpoint those as well (like committing them to a Kafka topic).
> 
>  Let me know your thoughts on this approach. It would work, but the
>  implementation details could be a mess.
> 
>  Cheers,
>  Richard
> 
> 
> 
> 
> 
>  On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen 
>  wrote:
> 
> > Hey Richard,
> >
> > thanks for the explanation! After some thinking, I do understand more
> > about this KIP. The motivation was to increase 

Build failed in Jenkins: kafka-trunk-jdk8 #3279

2019-01-03 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-7734: Metrics tags should use LinkedHashMap to guarantee 
ordering

--
[...truncated 2.25 MB...]

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 

Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka Streams

2019-01-03 Thread Boyang Chen
Hey Richard,

thanks for the explanation. Recently I read an interesting blog 
post from Apache Pulsar 
(written long time ago), where they define the concept of individual ack which 
means we could skip records and leave certain records remain on the queue for 
late processing. This should be something similar to KIP-408 which also shares 
some motivations for us to invest.

Boyang


From: Richard Yu 
Sent: Friday, January 4, 2019 5:42 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka Streams

Hi all,

Just bumping this KIP. Would be great if we got some discussion.


On Sun, Dec 30, 2018 at 5:13 PM Richard Yu 
wrote:

> Hi all,
>
> I made some recent changes to the KIP. It should be more relevant with the
> issue now (involves Processor API in detail).
> It would be great if you could comment.
>
> Thanks,
> Richard
>
> On Wed, Dec 26, 2018 at 10:01 PM Richard Yu 
> wrote:
>
>> Hi all,
>>
>> Just changing the title of the KIP. Discovered it wasn't right.
>> Thats about it. :)
>>
>> On Mon, Dec 24, 2018 at 7:57 PM Richard Yu 
>> wrote:
>>
>>> Sorry, just making a correction.
>>>
>>> Even if we are processing records out of order, we will still have to
>>> checkpoint offset ranges.
>>> So it doesn't really change anything even if we are doing in-order
>>> processing.
>>>
>>> Thinking this over, I'm leaning slightly towards maintaining the
>>> ordering guarantee.
>>> Although when implementing this change, there might be some kinks that
>>> we have not thought about which could throw a monkey wrench into the works.
>>>
>>> But definitely worth trying out,
>>> Richard
>>>
>>> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu 
>>> wrote:
>>>
 Hi Boyang,

 I could see where you are going with this. Well, I suppose I should
 have added this to alternatives, but I might as well mention it now.

 It had crossed my mind that we consider returning in-order even if
 there are multiple threads processing on the same thread. But for this to
 happen, we must block for the offsets in-between which have not been
 processed yet. For example, offsets 1-50 are being processed by thread1,
 while the offsets 51 - 100 are being processed by thread2. We will have to
 wait for thread1 to finish processing its offsets first before we return
 the records processed by thread2. So in other words, once thread1 is done,
 thread2's work up to that point will be returned in one go, but not before
 that.

 I suppose this could work, but the client will have to wait some time
 before the advantages of multithreaded processing can be seen (i.e. the
 first thread has to finish processing its segment of the records first
 before any others are returned to guarantee ordering). Another point I
 would like to make is that the threads are *asynchronous. *So for us
 to know when a thread is done processing a certain segment, we will
 probably have a similar policy to how getMetadataAsync() works (i.e. have a
 parent thread be notified of when the children threads are done).
 [image: image.png]
 Just pulling this from the KIP. But instead, we would apply this to
 metadata segments instead of just a callback.
 I don't know whether or not the tradeoffs are acceptable to the client.
 Ordering could be guaranteed, but it would be hard to do. For example, if
 there was a crash, we might lose track of which offsets numbers and ranges
 we are processing for each child thread, so somehow we need to find a way
 to checkpoint those as well (like committing them to a Kafka topic).

 Let me know your thoughts on this approach. It would work, but the
 implementation details could be a mess.

 Cheers,
 Richard





 On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen 
 wrote:

> Hey Richard,
>
> thanks for the explanation! After some thinking, I do understand more
> about this KIP. The motivation was to increase the throughput and put 
> heavy
> lifting RPC calls or IO operations to the background. While I feel the
> ordering is hard to guarantee for async task, it is better to be
> configurable for the end users.
>
> An example use case I could think of is: for every 500 records
> processed, we need an RPC to external storage that takes non-trivial time,
> and before its finishing all 499 records before it shouldn't be visible to
> the end user. In such case, we need to have fine-grained control on the
> visibility of downstream consumer so that our async task is planting a
> barrier while still make 499 records non-blocking process and send to
> downstream. So eventually when the heavy RPC is done, we commit this 
> record
> to remove the barrier and make all 500 records available for downstream. 

Build failed in Jenkins: kafka-trunk-jdk8 #3278

2019-01-03 Thread Apache Jenkins Server
See 


Changes:

[harsha] KAFKA-6431: Shard purgatory to mitigate lock contention (#5338)

--
[...truncated 2.25 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 

[jira] [Resolved] (KAFKA-7734) Metrics tags should use LinkedHashMap to guarantee ordering

2019-01-03 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7734.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

> Metrics tags should use LinkedHashMap to guarantee ordering
> ---
>
> Key: KAFKA-7734
> URL: https://issues.apache.org/jira/browse/KAFKA-7734
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Guozhang Wang
>Assignee: lambdaliu
>Priority: Major
> Fix For: 2.2.0
>
>
> Today we store metrics tags inside MetricName from various places, and many 
> of them are using `HashMap`. However, for metrics reporters like JMXReporter, 
> the mBeanName is constructed by looping over `metricName.tags().entrySet()` 
> which does not guarantee ordering. This resulted a few places where the 
> mBeanName string not as expected, e.g. we document the Streams cache metrics 
> as 
> {code}
> kafka.streams:type=stream-record-cache-metrics,client-id=([-.\w]+),task-id=([-.\w]+),record-cache-id=([-.\w]+)
> {code}
> However, what I've seen from JMXReporter is, for example:
> {code}
> kafka.streams:type=stream-record-cache-metrics,record-cache-id=all,client-id=streams-saak-test-client-StreamThread-1,task-id=1_3
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-345: Introduce static membership protocol to reduce consumer rebalances

2019-01-03 Thread Harsha
+1 (binding).

Thanks,
Harsha

On Wed, Jan 2, 2019, at 9:59 AM, Boyang Chen wrote:
> Thanks Jason for the comment! I answered it on the discuss thread.
> 
> Folks, could we continue the vote for this KIP? This is a very critical 
> improvement for our streaming system
> stability and we need to get things rolling right at the start of 2019.
> 
> Thank you for your time!
> Boyang
> 
> 
> From: Jason Gustafson 
> Sent: Tuesday, December 18, 2018 7:40 AM
> To: dev
> Subject: Re: [VOTE] KIP-345: Introduce static membership protocol to 
> reduce consumer rebalances
> 
> Hi Boyang,
> 
> Thanks, the KIP looks good. Just one comment.
> 
> The new schema for the LeaveGroup request is slightly odd since it is
> handling both the single consumer use case and the administrative use case.
> I wonder we could make it consistent from a batching perspective.
> 
> In other words, instead of this:
> LeaveGroupRequest => GroupId MemberId [GroupInstanceId]
> 
> Maybe we could do this:
> LeaveGroupRequest => GroupId [GroupInstanceId MemberId]
> 
> For dynamic members, GroupInstanceId could be empty, which is consistent
> with JoinGroup. What do you think?
> 
> Also, just for clarification, what is the expected behavior if the current
> memberId of a static member is passed to LeaveGroup? Will the static member
> be removed? I know the consumer will not do this, but we'll still have to
> handle the case on the broker.
> 
> Best,
> Jason
> 
> 
> On Mon, Dec 10, 2018 at 11:54 PM Boyang Chen  wrote:
> 
> > Thanks Stanislav!
> >
> > Get Outlook for iOS
> >
> > 
> > From: Stanislav Kozlovski 
> > Sent: Monday, December 10, 2018 11:28 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [VOTE] KIP-345: Introduce static membership protocol to
> > reduce consumer rebalances
> >
> > This is great work, Boyang. Thank you very much.
> >
> > +1 (non-binding)
> >
> > On Mon, Dec 10, 2018 at 6:09 PM Boyang Chen  wrote:
> >
> > > Hey there, could I get more votes on this thread?
> > >
> > > Thanks for the vote from Mayuresh and Mike :)
> > >
> > > Best,
> > > Boyang
> > > 
> > > From: Mayuresh Gharat 
> > > Sent: Thursday, December 6, 2018 10:53 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [VOTE] KIP-345: Introduce static membership protocol to
> > > reduce consumer rebalances
> > >
> > > +1 (non-binding)
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Tue, Dec 4, 2018 at 6:58 AM Mike Freyberger <
> > mike.freyber...@xandr.com>
> > > wrote:
> > >
> > > > +1 (non binding)
> > > >
> > > > On 12/4/18, 9:43 AM, "Patrick Williams" <
> > patrick.willi...@storageos.com
> > > >
> > > > wrote:
> > > >
> > > > Pls take me off this VOTE list
> > > >
> > > > Best,
> > > >
> > > > Patrick Williams
> > > >
> > > > Sales Manager, UK & Ireland, Nordics & Israel
> > > > StorageOS
> > > > +44 (0)7549 676279
> > > > patrick.willi...@storageos.com
> > > >
> > > > 20 Midtown
> > > > 20 Proctor Street
> > > > Holborn
> > > > London WC1V 6NX
> > > >
> > > > Twitter: @patch37
> > > > LinkedIn: linkedin.com/in/patrickwilliams4 <
> > > >
> > >
> > https://nam02.safelinks.protection.outlook.com/?url=http%3A%2F%2Flinkedin.com%2Fin%2Fpatrickwilliams4data=02%7C01%7C%7C9b12ec4ce9ae4454db8a08d65f3a4862%7C84df9e7fe9f640afb435%7C1%7C0%7C636801101252994092sdata=ipDTX%2FGARrFkwZfRuOY0M5m3iJ%2Bnkxovv6u9bBDaTyc%3Dreserved=0
> > > >
> > > >
> > > >
> > >
> > https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fslack.storageos.com%2Fdata=02%7C01%7C%7C9b12ec4ce9ae4454db8a08d65f3a4862%7C84df9e7fe9f640afb435%7C1%7C0%7C636801101252994092sdata=hxuKU6aZdQU%2FpxpqaaThR6IjpEmwIP5%2F3NhYzMYijkw%3Dreserved=0
> > > >
> > > >
> > > >
> > > > On 03/12/2018, 17:34, "Guozhang Wang"  wrote:
> > > >
> > > > Hello Boyang,
> > > >
> > > > I've browsed through the new wiki and there are still a couple of
> > > > minor
> > > > things to notice:
> > > >
> > > > 1. RemoveMemberFromGroupOptions seems not defined anywhere.
> > > >
> > > > 2. LeaveGroupRequest added a list of group instance id, but still
> > > > keep the
> > > > member id as a singleton; is that intentional? I think to make
> > > the
> > > > protocol
> > > > consistent both member id and instance ids could be plural.
> > > >
> > > > 3. About the *kafka-remove-member-from-group.sh *tool, I'm
> > > > wondering if we
> > > > can defer adding this while just add the corresponding calls of
> > > the
> > > > LeaveGroupRequest inside Streams until we have used it in
> > > > production and
> > > > hence have a better understanding on how flexible or extensible
> > > if
> > > > we want
> > > > to add any cmd tools. The rationale is that if we do not
> > > > necessarily need
> > > > it now, we can always add it later with a more think-through API
> > > > design,
> > > > but if we add the tool in a rush, we may need to extend or modify
> > > > it soon
> > > > after we realize its 

Re: Tests to emulate leader change(investigation of KAFKA-7680)

2019-01-03 Thread Guozhang Wang
Hello Nikolay,

I've left a comment on the ticket.


Guozhang

On Sun, Dec 23, 2018 at 11:54 PM Nikolay Izhikov 
wrote:

> Hello, Guys.
>
> I started investigating of KAFKA-7680 [1]
>
> Jun Rao wrote that this issue can be reproduced with leader change.
>
> Please, give me the advice.
> How can I emulate leader change in tests?
>
> Do we have some existing tests I can look into?
>
> [1] https://issues.apache.org/jira/browse/KAFKA-7680
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-408: Add Asynchronous Processing to Kafka Streams

2019-01-03 Thread Richard Yu
Hi all,

Just bumping this KIP. Would be great if we got some discussion.


On Sun, Dec 30, 2018 at 5:13 PM Richard Yu 
wrote:

> Hi all,
>
> I made some recent changes to the KIP. It should be more relevant with the
> issue now (involves Processor API in detail).
> It would be great if you could comment.
>
> Thanks,
> Richard
>
> On Wed, Dec 26, 2018 at 10:01 PM Richard Yu 
> wrote:
>
>> Hi all,
>>
>> Just changing the title of the KIP. Discovered it wasn't right.
>> Thats about it. :)
>>
>> On Mon, Dec 24, 2018 at 7:57 PM Richard Yu 
>> wrote:
>>
>>> Sorry, just making a correction.
>>>
>>> Even if we are processing records out of order, we will still have to
>>> checkpoint offset ranges.
>>> So it doesn't really change anything even if we are doing in-order
>>> processing.
>>>
>>> Thinking this over, I'm leaning slightly towards maintaining the
>>> ordering guarantee.
>>> Although when implementing this change, there might be some kinks that
>>> we have not thought about which could throw a monkey wrench into the works.
>>>
>>> But definitely worth trying out,
>>> Richard
>>>
>>> On Mon, Dec 24, 2018 at 6:51 PM Richard Yu 
>>> wrote:
>>>
 Hi Boyang,

 I could see where you are going with this. Well, I suppose I should
 have added this to alternatives, but I might as well mention it now.

 It had crossed my mind that we consider returning in-order even if
 there are multiple threads processing on the same thread. But for this to
 happen, we must block for the offsets in-between which have not been
 processed yet. For example, offsets 1-50 are being processed by thread1,
 while the offsets 51 - 100 are being processed by thread2. We will have to
 wait for thread1 to finish processing its offsets first before we return
 the records processed by thread2. So in other words, once thread1 is done,
 thread2's work up to that point will be returned in one go, but not before
 that.

 I suppose this could work, but the client will have to wait some time
 before the advantages of multithreaded processing can be seen (i.e. the
 first thread has to finish processing its segment of the records first
 before any others are returned to guarantee ordering). Another point I
 would like to make is that the threads are *asynchronous. *So for us
 to know when a thread is done processing a certain segment, we will
 probably have a similar policy to how getMetadataAsync() works (i.e. have a
 parent thread be notified of when the children threads are done).
 [image: image.png]
 Just pulling this from the KIP. But instead, we would apply this to
 metadata segments instead of just a callback.
 I don't know whether or not the tradeoffs are acceptable to the client.
 Ordering could be guaranteed, but it would be hard to do. For example, if
 there was a crash, we might lose track of which offsets numbers and ranges
 we are processing for each child thread, so somehow we need to find a way
 to checkpoint those as well (like committing them to a Kafka topic).

 Let me know your thoughts on this approach. It would work, but the
 implementation details could be a mess.

 Cheers,
 Richard





 On Mon, Dec 24, 2018 at 4:59 PM Boyang Chen 
 wrote:

> Hey Richard,
>
> thanks for the explanation! After some thinking, I do understand more
> about this KIP. The motivation was to increase the throughput and put 
> heavy
> lifting RPC calls or IO operations to the background. While I feel the
> ordering is hard to guarantee for async task, it is better to be
> configurable for the end users.
>
> An example use case I could think of is: for every 500 records
> processed, we need an RPC to external storage that takes non-trivial time,
> and before its finishing all 499 records before it shouldn't be visible to
> the end user. In such case, we need to have fine-grained control on the
> visibility of downstream consumer so that our async task is planting a
> barrier while still make 499 records non-blocking process and send to
> downstream. So eventually when the heavy RPC is done, we commit this 
> record
> to remove the barrier and make all 500 records available for downstream. 
> So
> here we still need to guarantee the ordering within 500 records, while in
> the same time consumer semantic has nothing to change.
>
> Am I making the point clear here? Just want have more discussion on
> the ordering guarantee since I feel it wouldn't be a good idea to break
> consumer ordering guarantee by default.
>
> Best,
> Boyang
>
> 
> From: Richard Yu 
> Sent: Saturday, December 22, 2018 9:08 AM
> To: dev@kafka.apache.org
> Subject: Re: KIP-408: Add Asynchronous Processing to Kafka Streams
>
> Hi Boyang,

Re: [DISCUSS] KIP-262 Metadata should include the number of state stores for task

2019-01-03 Thread Richard Yu
Hi Matthias,

I guess this is no longer necessary. I am open to anything honestly.
I suppose we should close it (if its not already).


On Fri, Oct 19, 2018 at 11:06 AM Matthias J. Sax 
wrote:

> Any thought on my last email about discarding this KIP?
>
>
> -Matthias
>
> On 9/14/18 11:44 AM, Matthias J. Sax wrote:
> > Hi,
> >
> > we recently had a discussion on a different ticket to reduce the size of
> > the metadata we need to send:
> > https://issues.apache.org/jira/browse/KAFKA-7149
> >
> > It seems, that we actually don't need to include the number of stores in
> > the metadata, but that we can compute the number of stores locally on
> > each instance.
> >
> > With this insight, we should still try to exploit this knowledge during
> > task assignment, however, this would be an internal change that does not
> > require a KIP. Thus, I think that we can discard this KIP.
> >
> > Thoughts?
> >
> >
> > -Matthias
> >
> > On 6/10/18 5:20 PM, Matthias J. Sax wrote:
> >> Richard,
> >>
> >> KIP-268 got merged and thus this KIP is unblocked.
> >>
> >> I just re-read it and think it needs some updates with regard to the
> >> upgrade path (ie, you should mention why upgrading is covered).
> >>
> >> It would also be useful to discuss how the store information is used
> >> during assignment. Atm, the KIP only discussed that the information
> >> should be added, but this is only half of the story from my point of
> view.
> >>
> >>
> >> -Matthias
> >>
> >> On 3/22/18 9:15 PM, Matthias J. Sax wrote:
> >>> Hi Richard,
> >>>
> >>> with KIP-268 in place (should be accepted soon) the upgrade path is
> >>> covered. Thus, you can update your KIP accordingly, referring to
> KIP-268.
> >>>
> >>> Can you also update your KIP similar to KIP-268 to cover the old and
> new
> >>> metadata format?
> >>>
> >>> Thanks!
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 2/24/18 4:07 PM, Richard Yu wrote:
>  I didn't really get what "upgrade strategy" was at the time that
> Guozhang
>  mentioned it, so I wrote the above dialogue from my first
> understanding. I
>  changed it to "upgrade strategy must be provided". Currently,
> however, I do
>  not have anything in mind to facilitate upgrading older Kafka
> brokers. If
>  you have anything in mind, please let me know.
> 
> 
> 
> 
> 
> 
>  On Sat, Feb 24, 2018 at 3:58 PM, Matthias J. Sax <
> matth...@confluent.io>
>  wrote:
> 
> > Thanks a lot for this KIP.
> >
> > I am not sure what you mean by
> >
> >> which could potentially break older versions of Kafka brokers
> >
> > The metadata that is exchange, is not interpreted by the brokers. The
> > problem with upgrading the metadata format affect only Kafka Streams
> > instances.
> >
> > If we don't provide an upgrade strategy, changing the metadata format
> > required to stop all running application instances, before the
> instances
> > can be restarted with the new code. However, this implies downtime
> for
> > an application and is thus not acceptable.
> >
> >
> > -Matthias
> >
> >
> > On 2/24/18 11:11 AM, Richard Yu wrote:
> >> Hi all,
> >>
> >> I would like to discuss a KIP I've submitted :
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 262%3A+Metadata+should+include+number+of+state+stores+for+task
> >>
> >>
> >> Regards,
> >> Richard Yu
> >>
> >
> >
> 
> >>>
> >>
> >
>
>


Re: [DISCUSS] KIP-360: Improve handling of unknown producer

2019-01-03 Thread Guozhang Wang
You're right about the dangling txn since it will actually block
read-committed consumers from proceeding at all. I'd agree that since this
is a very rare case, we can consider fixing it not via broker-side logic
but via tooling in a future work.

I've also discovered some related error handling logic inside producer that
may be addressed together with this KIP (since it is mostly for internal
implementations the wiki itself does not need to be modified):

https://stackoverflow.com/questions/53976117/why-did-the-kafka-stream-fail-to-produce-data-after-a-long-time/54029181#54029181

Guozhang



On Thu, Nov 29, 2018 at 2:25 PM Jason Gustafson  wrote:

> Hey Guozhang,
>
> To clarify, the broker does not actually use the ApiVersion API for
> inter-broker communications. The use of an API and its corresponding
> version is controlled by `inter.broker.protocol.version`.
>
> Nevertheless, it sounds like we're on the same page about removing
> DescribeTransactionState. The impact of a dangling transaction is a little
> worse than what you describe though. Consumers with the read_committed
> isolation level will be stuck. Still, I think we agree that this case
> should be rare and we can reconsider for future work. Rather than
> preventing dangling transactions, perhaps we should consider options which
> allows us to detect them and recover. Anyway, this needs more thought. I
> will update the KIP.
>
> Best,
> Jason
>
> On Tue, Nov 27, 2018 at 6:51 PM Guozhang Wang  wrote:
>
> > 0. My original question is about the implementation details primarily,
> > since current the handling logic of the APIVersionResponse is simply "use
> > the highest supported version of the corresponding request", but if the
> > returned response from APIVersionRequest says "I don't even know about
> the
> > DescribeTransactionStateRequest at all", then we need additional logic
> for
> > the falling back logic. Currently this logic is embedded in NetworkClient
> > which is shared by all clients, so I'd like to avoid making this logic
> more
> > complicated.
> >
> > As for the general issue that a broker does not recognize a producer with
> > sequence number 0, here's my thinking: as you mentioned in the wiki, this
> > is only a concern for transactional producer since for idempotent
> producer
> > it can just bump the epoch and go. For transactional producer, even if
> the
> > producer request from a fenced producer gets accepted, its transaction
> will
> > never be committed and hence messages not exposed to read-committed
> > consumers as well. The drawback is though, 1) read-uncommitted consumers
> > will still read those messages, 2) unnecessary storage for those fenced
> > produce messages, but in practice should not accumulate to a large amount
> > since producer should soon try to commit and be told it is fenced and
> then
> > stop, 3) there will be no markers for those transactional messages ever.
> > Looking at the list and thinking about the likelihood it may happen
> > assuming we retain the producer up to transactional.id.timeout (default
> is
> > 7 days), I feel comfortable leaving it as is.
> >
> > Guozhang
> >
> > On Mon, Nov 26, 2018 at 6:09 PM Jason Gustafson 
> > wrote:
> >
> > > Hey Guozhang,
> > >
> > > Thanks for the comments. Responses below:
> > >
> > > 0. The new API is used between brokers, so we govern its usage using
> > > `inter.broker.protocol.version`. If the other broker hasn't upgraded,
> we
> > > will just fallback to the old logic, which is to accept the write. This
> > is
> > > similar to how we introduced the OffsetsForLeaderEpoch API. Does that
> > seem
> > > reasonable?
> > >
> > > To tell the truth, after digging this KIP up and reading it over, I am
> > > doubting how crucial this API is. It is attempting to protect a write
> > from
> > > a zombie which has just reset its sequence number after that producer
> had
> > > had its state cleaned up. However, one of the other improvements in
> this
> > > KIP is to maintain producer state beyond its retention in the log. I
> > think
> > > that makes this case sufficiently unlikely that we can leave it for
> > future
> > > work. I am not 100% sure this is the only scenario where transaction
> > state
> > > and log state can diverge anyway, so it would be better to consider
> this
> > > problem more generally. What do you think?
> > >
> > > 1. Thanks, from memory, the term changed after the first iteration.
> I'll
> > > make a pass and try to clarify usage.
> > > 2. I was attempting to handle some edge cases since this check would be
> > > asynchronous. In any case, if we drop this validation as suggested
> above,
> > > then we can ignore this.
> > >
> > > -Jason
> > >
> > >
> > >
> > > On Tue, Nov 13, 2018 at 6:23 PM Guozhang Wang 
> > wrote:
> > >
> > > > Hello Jason, thanks for the great write-up.
> > > >
> > > > 0. One question about the migration plan: "The new
> GetTransactionState
> > > API
> > > > and the new version of the transaction state message 

Re: Suggestion to make 0 grace period the default for suppress

2019-01-03 Thread Matthias J. Sax
Kafka Streams does not change topic configs if a topic exists already.
Thus, there should be no impact in existing application if they are
upgraded.

However, I share the backward compatibility concern in general.
Furthermore, for windowing, retention period must be at least window
size IIRC, thus, if we change it, we should set the default accordingly.

I still have my doubts, if a short retention time like this would be a
good out-of-the-box experience, because we might loose out-of-order data
that miss the window-end time...


-Matthias

On 1/3/19 9:05 PM, Boyang Chen wrote:
> Thanks for the proposal Jingguo. Guozhang when you mention to "change the
> default value of retention to 0 consistently", will we introduce any backward 
> incompatible issue since
> the retention cutoff might change on the changelog topics that are unexpected.
> 
> Boyang
> 
> 
> From: Guozhang Wang 
> Sent: Friday, January 4, 2019 3:16 AM
> To: dev
> Subject: Re: Suggestion to make 0 grace period the default for suppress
> 
> Thanks for reporting this Jingguo, personally I'd favor we change the
> default value of retention to 0 consistently, regardless of whether
> suppress is used, if we would ever consider changing it.
> 
> Since it is a public API change (i.e. changing the default value), it
> should be discussed and voted via a KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> 
> Are you interested in creating one and drive the discussion?
> 
> 
> Guozhang
> 
> On Sun, Dec 30, 2018 at 10:34 PM jingguo yao  wrote:
> 
>> [1] has the following code to demonstrate the usage of suppress method.
>>
>> KGroupedStream grouped = ...;
>> grouped
>>   .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
>>   .count()
>>   .suppress(Suppressed.untilWindowCloses(unbounded()))
>>   .filter((windowedUserId, count) -> count < 3)
>>   .toStream()
>>   .foreach((windowedUserId, count) ->
>> sendAlert(windowedUserId.window(), windowedUserId.key(), count));
>>
>>
>> If I remove the grace method invocation, I will have a one day
>> retention period. The following code in
>> org.apache.kafka.streams.kstream.TimeWindows causes this hehaviour:
>>
>> @SuppressWarnings("deprecation") // continuing to support
>> Windows#maintainMs/segmentInterval in fallback mode
>> @Override
>> public long gracePeriodMs() {
>>   // NOTE: in the future, when we remove maintainMs,
>>   // we should default the grace period to 24h to maintain the default
>> behavior,
>>   // or we can default to (24h - size) if you want to be super accurate.
>>   return grace != null ? grace.toMillis() : maintainMs() - size();
>> }
>>
>> I think that it is better to use 0 grace period if
>> "suppress(Suppressed.untilWindowCloses(unbounded()))" exists. With the
>> suppress method invocation, people are expecting to see the final
>> window result when the window closes instead of wait to see the result
>> after the one-day period. Even if we have some reasons similar to ones
>> mentioned in the code comment, it is better to mention this hehaviour
>> somewhere in Kafka streams documentation.
>>
>>
>> [1]
>> http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-resu
>>
>>
>> --
>> Jingguo
>>
> 
> 
> --
> -- Guozhang
> 



signature.asc
Description: OpenPGP digital signature


Re: Suggestion to make 0 grace period the default for suppress

2019-01-03 Thread Boyang Chen
Thanks for the proposal Jingguo. Guozhang when you mention to "change the
default value of retention to 0 consistently", will we introduce any backward 
incompatible issue since
the retention cutoff might change on the changelog topics that are unexpected.

Boyang


From: Guozhang Wang 
Sent: Friday, January 4, 2019 3:16 AM
To: dev
Subject: Re: Suggestion to make 0 grace period the default for suppress

Thanks for reporting this Jingguo, personally I'd favor we change the
default value of retention to 0 consistently, regardless of whether
suppress is used, if we would ever consider changing it.

Since it is a public API change (i.e. changing the default value), it
should be discussed and voted via a KIP:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

Are you interested in creating one and drive the discussion?


Guozhang

On Sun, Dec 30, 2018 at 10:34 PM jingguo yao  wrote:

> [1] has the following code to demonstrate the usage of suppress method.
>
> KGroupedStream grouped = ...;
> grouped
>   .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
>   .count()
>   .suppress(Suppressed.untilWindowCloses(unbounded()))
>   .filter((windowedUserId, count) -> count < 3)
>   .toStream()
>   .foreach((windowedUserId, count) ->
> sendAlert(windowedUserId.window(), windowedUserId.key(), count));
>
>
> If I remove the grace method invocation, I will have a one day
> retention period. The following code in
> org.apache.kafka.streams.kstream.TimeWindows causes this hehaviour:
>
> @SuppressWarnings("deprecation") // continuing to support
> Windows#maintainMs/segmentInterval in fallback mode
> @Override
> public long gracePeriodMs() {
>   // NOTE: in the future, when we remove maintainMs,
>   // we should default the grace period to 24h to maintain the default
> behavior,
>   // or we can default to (24h - size) if you want to be super accurate.
>   return grace != null ? grace.toMillis() : maintainMs() - size();
> }
>
> I think that it is better to use 0 grace period if
> "suppress(Suppressed.untilWindowCloses(unbounded()))" exists. With the
> suppress method invocation, people are expecting to see the final
> window result when the window closes instead of wait to see the result
> after the one-day period. Even if we have some reasons similar to ones
> mentioned in the code comment, it is better to mention this hehaviour
> somewhere in Kafka streams documentation.
>
>
> [1]
> http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-resu
>
>
> --
> Jingguo
>


--
-- Guozhang


Re: Suggestion to make 0 grace period the default for suppress

2019-01-03 Thread Guozhang Wang
Thanks for reporting this Jingguo, personally I'd favor we change the
default value of retention to 0 consistently, regardless of whether
suppress is used, if we would ever consider changing it.

Since it is a public API change (i.e. changing the default value), it
should be discussed and voted via a KIP:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

Are you interested in creating one and drive the discussion?


Guozhang

On Sun, Dec 30, 2018 at 10:34 PM jingguo yao  wrote:

> [1] has the following code to demonstrate the usage of suppress method.
>
> KGroupedStream grouped = ...;
> grouped
>   .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
>   .count()
>   .suppress(Suppressed.untilWindowCloses(unbounded()))
>   .filter((windowedUserId, count) -> count < 3)
>   .toStream()
>   .foreach((windowedUserId, count) ->
> sendAlert(windowedUserId.window(), windowedUserId.key(), count));
>
>
> If I remove the grace method invocation, I will have a one day
> retention period. The following code in
> org.apache.kafka.streams.kstream.TimeWindows causes this hehaviour:
>
> @SuppressWarnings("deprecation") // continuing to support
> Windows#maintainMs/segmentInterval in fallback mode
> @Override
> public long gracePeriodMs() {
>   // NOTE: in the future, when we remove maintainMs,
>   // we should default the grace period to 24h to maintain the default
> behavior,
>   // or we can default to (24h - size) if you want to be super accurate.
>   return grace != null ? grace.toMillis() : maintainMs() - size();
> }
>
> I think that it is better to use 0 grace period if
> "suppress(Suppressed.untilWindowCloses(unbounded()))" exists. With the
> suppress method invocation, people are expecting to see the final
> window result when the window closes instead of wait to see the result
> after the one-day period. Even if we have some reasons similar to ones
> mentioned in the code comment, it is better to mention this hehaviour
> somewhere in Kafka streams documentation.
>
>
> [1]
> http://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-final-resu
>
>
> --
> Jingguo
>


-- 
-- Guozhang


Re: [EXTERNAL] [VOTE] KIP-382 MirrorMaker 2.0

2019-01-03 Thread Ryanne Dolan
Happy New Year all! Current tally is:
+12 non-binding
+1 binding

Thanks for the support so far!
Ryanne

On Wed, Jan 2, 2019 at 5:26 AM Skrzypek, Jonathan 
wrote:

> +1 (non-binding)
>
> Jonathan Skrzypek
>
> -Original Message-
> From: Mickael Maison 
> Sent: 02 January 2019 11:10
> To: dev 
> Subject: Re: [EXTERNAL] [VOTE] KIP-382 MirrorMaker 2.0
>
> +1 (non-binding)
> Thanks Ryanne
>
> On Wed, Jan 2, 2019 at 4:47 AM McCaig, Rhys 
> wrote:
> >
> > +1 (non-binding). Fantastic work on the KIP Ryanne.
> >
> > > On Dec 25, 2018, at 9:10 AM, Stephane Maarek <
> kafka.tutori...@gmail.com> wrote:
> > >
> > > +1 ! Great stuff
> > >
> > > Stephane
> > >
> > > On Mon., 24 Dec. 2018, 12:07 pm Edoardo Comar  wrote:
> > >
> > >> +1 non-binding
> > >>
> > >> thanks for the KIP
> > >> --
> > >>
> > >> Edoardo Comar
> > >>
> > >> IBM Event Streams
> > >>
> > >>
> > >> Harsha  wrote on 21/12/2018 20:17:03:
> > >>
> > >>> From: Harsha 
> > >>> To: dev@kafka.apache.org
> > >>> Date: 21/12/2018 20:17
> > >>> Subject: Re: [VOTE] KIP-382 MirrorMaker 2.0
> > >>>
> > >>> +1 (binding).  Nice work Ryan.
> > >>> -Harsha
> > >>>
> > >>> On Fri, Dec 21, 2018, at 8:14 AM, Andrew Schofield wrote:
> >  +1 (non-binding)
> > 
> >  Andrew Schofield
> >  IBM Event Streams
> > 
> >  On 21/12/2018, 01:23, "Srinivas Reddy"
> >  
> > >> wrote:
> > 
> > +1 (non binding)
> > 
> > Thank you Ryan for the KIP, let me know if you need support in
> > >>> implementing
> > it.
> > 
> > -
> > Srinivas
> > 
> > - Typed on tiny keys. pls ignore typos.{mobile app}
> > 
> > 
> > On Fri, 21 Dec, 2018, 08:26 Ryanne Dolan
> >   > >> wrote:
> > 
> > > Thanks for the votes so far!
> > >
> > > Due to recent discussions, I've removed the high-level REST
> > >>> API from the
> > > KIP.
> > >
> > > On Thu, Dec 20, 2018 at 12:42 PM Paul Davidson
> > >>> 
> > > wrote:
> > >
> > >> +1
> > >>
> > >> Would be great to see the community build on the basic
> > >>> approach we took
> > >> with Mirus. Thanks Ryanne.
> > >>
> > >> On Thu, Dec 20, 2018 at 9:01 AM Andrew Psaltis
> > >>>  > >>
> > >> wrote:
> > >>
> > >>> +1
> > >>>
> > >>> Really looking forward to this and to helping in any way
> > >>> I can. Thanks
> > >> for
> > >>> kicking this off Ryanne.
> > >>>
> > >>> On Thu, Dec 20, 2018 at 10:18 PM Andrew Otto
> > >> 
> > > wrote:
> > >>>
> >  +1
> > 
> >  This looks like a huge project! Wikimedia would be
> > >>> very excited to
> > > have
> >  this. Thanks!
> > 
> >  On Thu, Dec 20, 2018 at 9:52 AM Ryanne Dolan
> > >>> 
> >  wrote:
> > 
> > > Hey y'all, please vote to adopt KIP-382 by replying +1
> > >> to this
> > >> thread.
> > >
> > > For your reference, here are the highlights of the
> > >> proposal:
> > >
> > > - Leverages the Kafka Connect framework and ecosystem.
> > > - Includes both source and sink connectors.
> > > - Includes a high-level driver that manages connectors
> > >> in a
> > > dedicated
> > > cluster.
> > > - High-level REST API abstracts over connectors
> > >>> between multiple
> > >> Kafka
> > > clusters.
> > > - Detects new topics, partitions.
> > > - Automatically syncs topic configuration between
> > >> clusters.
> > > - Manages downstream topic ACL.
> > > - Supports "active/active" cluster pairs, as well as
> > >>> any number of
> > >>> active
> > > clusters.
> > > - Supports cross-data center replication,
> > >>> aggregation, and other
> > >>> complex
> > > topologies.
> > > - Provides new metrics including end-to-end
> > >>> replication latency
> > >> across
> > > multiple data centers/clusters.
> > > - Emits offsets required to migrate consumers
> > >>> between clusters.
> > > - Tooling for offset translation.
> > > - MirrorMaker-compatible legacy mode.
> > >
> > > Thanks, and happy holidays!
> > > Ryanne
> > >
> > 
> > >>>
> > >>
> > >>
> > >> --
> > >> Paul Davidson
> > >> Principal Engineer, Ajna Team
> > >> Big Data & Monitoring
> > >>
> > >
> > 
> > 
> > >>>
> > >>
> > >> Unless stated otherwise above:
> > >> IBM United Kingdom Limited - Registered in England and Wales with
> > >> number 741598.
> > >> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> > >> PO6 3AU
> > >>
> >
>
> 
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal 

[jira] [Created] (KAFKA-7785) Move DefaultPartitionGrouper (and PartitionGrouper) to o.a.k.streams.processor.internals package

2019-01-03 Thread Jacek Laskowski (JIRA)
Jacek Laskowski created KAFKA-7785:
--

 Summary: Move DefaultPartitionGrouper (and PartitionGrouper) to 
o.a.k.streams.processor.internals package
 Key: KAFKA-7785
 URL: https://issues.apache.org/jira/browse/KAFKA-7785
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.1.0
Reporter: Jacek Laskowski


Since {{DefaultPartitionGrouper}} is only for the purpose of the internal 
{{StreamsPartitionAssignor}} it would make sense to have it in the 
{{org.apache.kafka.streams.processor.internals}} package.

I would also vote to move {{PartitionGrouper.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Use KafkaAdminClient to get broker dynamic config “leader.replication.throttled.rate” and “follower.replication.throttled.rate” ,but returned null

2019-01-03 Thread Rajini Sivaram
KafkaAdminClent#describeConfigs currently only returns values of
non-sensitive configs that are defined in `KafkaConfig`. Since `
leader.replication.throttled.rate` is not defined in `KafkaConfig`, it is
handled as a custom config and hence the value is not returned. We could
however change this behaviour for the three configs defined in
kafka.server.DynamicConfig since we know that these are not sensitive
values. Can you open a JIRA?

Thanks,

Rajini

On Thu, Jan 3, 2019 at 2:34 AM tinawenqiao <315524...@qq.com> wrote:

> Hi, everyone:I found KafkaAdminClient.describeConfigs can't return the
> dynamic broker config value.
> First I use kafka-reassign-partitions.sh to reassign partitions, and
> at the same time I add throttle as follows. And then use describeConfig to
> get the config value, but I can't get the result as I expected.
> Is it a bug or the design is just right like this?
>
>
>  Step 1: Reassign with throttle parameter
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2183
> --reassignment-json-file reassign.json --execute --throttle 3000
> --replica-alter-log-dirs-throttle 2000 --bootstrap-server localhost:19092
> Current partition replica assignment
>
> {"version":1,"partitions":[{"topic":"test-topic3","partition":0,"replicas":[115],"log_dirs":["any"]}]}
> Save this to use as the --reassignment-json-file option during rollback
> Warning: You must run Verify periodically, until the reassignment
> completes, to ensure the throttle is removed. You can also alter the
> throttle by rerunning the Execute command passing a new value.
> The inter-broker throttle limit was set to 3000 B/s
> The replica-alter-dir throttle limit was set to 2000 B/s
> Successfully started reassignment of partitions.
>
>
>
> Step 2: I can get broker dynamic config on zk.
> [zk: localhost:2183(CONNECTED) 47] get /config/brokers/111
>
> {"version":1,"config":{"replica.alter.log.dirs.io.max.bytes.per.second":"2000","leader.replication.throttled.rate":"3000","follower.replication.throttled.rate":"3000"}}
> cZxid = 0x1a99
> ctime = Fri Dec 21 13:59:20 CST 2018
> mZxid = 0x2ebb
> mtime = Thu Jan 03 10:06:56 CST 2019
> pZxid = 0x1a99
> cversion = 0
> dataVersion = 19
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 168
> numChildren = 0
>
>
>
>
> Step3:Use org.apache.kafka.clients.admin.KafkaAdminClient  describe broker
> 111's config, the results are all null
> leader.replication.throttled.rate = null, isReadOnly:true,
> source:DYNAMIC_BROKER_CONFIG
>
> follower.replication.throttled.rate = null, isReadOnly:true,
> source:DYNAMIC_BROKER_CONFIG
>
> replica.alter.log.dirs.io.max.bytes.per.second = null, isReadOnly:true,
> source:DYNAMIC_BROKER_CONFIG


[jira] [Created] (KAFKA-7784) Producer & message format : better management when inter.broker.protocol.version is overrided

2019-01-03 Thread JIRA
Hervé RIVIERE created KAFKA-7784:


 Summary: Producer & message format : better management when 
inter.broker.protocol.version is overrided
 Key: KAFKA-7784
 URL: https://issues.apache.org/jira/browse/KAFKA-7784
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 2.0.1
Reporter: Hervé RIVIERE


With the following setup : 
 * Java producer v2.0.1
 * Brokers 2.0.1 with *message format V1* (so with 
log.message.format.version=0.10.2 & inter.broker.protocol.version=0.10.2)

 

Producer is sending message with format message V2 so useless down convert to 
V1 message format is triggered on broker side.

 

 

An improvement to avoid wasting CPU ressources on broker side will broker 
advertise current message format version to client (currently broker advertises 
only api available). 

With message format version info producer will be able to prefer a specific 
message format to avoid down conversion on broker side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-2.1-jdk8 #93

2019-01-03 Thread Apache Jenkins Server
See 


Changes:

[cshapi] MINOR: Upgrade to Gradle 4.10.3

--
[...truncated 736.97 KB...]
kafka.api.GroupEndToEndAuthorizationTest > testNoGroupAcl STARTED

kafka.api.GroupEndToEndAuthorizationTest > testNoGroupAcl PASSED

kafka.api.GroupEndToEndAuthorizationTest > testNoProduceWithDescribeAcl STARTED

kafka.api.GroupEndToEndAuthorizationTest > testNoProduceWithDescribeAcl PASSED

kafka.api.GroupEndToEndAuthorizationTest > testProduceConsumeViaSubscribe 
STARTED

kafka.api.GroupEndToEndAuthorizationTest > testProduceConsumeViaSubscribe PASSED

kafka.api.GroupEndToEndAuthorizationTest > testNoProduceWithoutDescribeAcl 
STARTED
ERROR: Could not install GRADLE_4_8_1_HOME
java.lang.NullPointerException
at 
hudson.plugins.toolenv.ToolEnvBuildWrapper$1.buildEnvVars(ToolEnvBuildWrapper.java:46)
at hudson.model.AbstractBuild.getEnvironment(AbstractBuild.java:881)
at hudson.plugins.git.GitSCM.getParamExpandedRepos(GitSCM.java:483)
at 
hudson.plugins.git.GitSCM.compareRemoteRevisionWithImpl(GitSCM.java:692)
at hudson.plugins.git.GitSCM.compareRemoteRevisionWith(GitSCM.java:657)
at hudson.scm.SCM.compareRemoteRevisionWith(SCM.java:400)
at hudson.scm.SCM.poll(SCM.java:417)
at hudson.model.AbstractProject._poll(AbstractProject.java:1390)
at hudson.model.AbstractProject.poll(AbstractProject.java:1293)
at hudson.triggers.SCMTrigger$Runner.runPolling(SCMTrigger.java:603)
at hudson.triggers.SCMTrigger$Runner.run(SCMTrigger.java:649)
at 
hudson.util.SequentialExecutionQueue$QueueEntry.run(SequentialExecutionQueue.java:119)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

kafka.api.GroupEndToEndAuthorizationTest > testNoProduceWithoutDescribeAcl 
PASSED

kafka.api.test.ProducerCompressionTest > testCompression[0 compressionType = 
none] STARTED

kafka.api.test.ProducerCompressionTest > testCompression[0 compressionType = 
none] PASSED

kafka.api.test.ProducerCompressionTest > testCompression[1 compressionType = 
gzip] STARTED

kafka.api.test.ProducerCompressionTest > testCompression[1 compressionType = 
gzip] PASSED

kafka.api.test.ProducerCompressionTest > testCompression[2 compressionType = 
snappy] STARTED

kafka.api.test.ProducerCompressionTest > testCompression[2 compressionType = 
snappy] PASSED

kafka.api.test.ProducerCompressionTest > testCompression[3 compressionType = 
lz4] STARTED

kafka.api.test.ProducerCompressionTest > testCompression[3 compressionType = 
lz4] PASSED

kafka.api.test.ProducerCompressionTest > testCompression[4 compressionType = 
zstd] STARTED

kafka.api.test.ProducerCompressionTest > testCompression[4 compressionType = 
zstd] PASSED

kafka.api.GroupAuthorizerIntegrationTest > testCommitWithTopicWrite STARTED

kafka.api.GroupAuthorizerIntegrationTest > testCommitWithTopicWrite PASSED

kafka.api.GroupAuthorizerIntegrationTest > shouldInitTransactionsWhenAclSet 
STARTED

kafka.api.GroupAuthorizerIntegrationTest > shouldInitTransactionsWhenAclSet 
PASSED

kafka.api.GroupAuthorizerIntegrationTest > 
testDeleteGroupApiWithNoDeleteGroupAcl STARTED

kafka.api.GroupAuthorizerIntegrationTest > 
testDeleteGroupApiWithNoDeleteGroupAcl PASSED

kafka.api.GroupAuthorizerIntegrationTest > 
testPatternSubscriptionWithTopicAndGroupRead STARTED

kafka.api.GroupAuthorizerIntegrationTest > 
testPatternSubscriptionWithTopicAndGroupRead PASSED

kafka.api.GroupAuthorizerIntegrationTest > 
testPatternSubscriptionWithNoTopicAccess STARTED

kafka.api.GroupAuthorizerIntegrationTest > 
testPatternSubscriptionWithNoTopicAccess PASSED

kafka.api.GroupAuthorizerIntegrationTest > 
testCreatePermissionOnClusterToWriteToNonExistentTopic STARTED

kafka.api.GroupAuthorizerIntegrationTest > 
testCreatePermissionOnClusterToWriteToNonExistentTopic PASSED

kafka.api.GroupAuthorizerIntegrationTest > 
testCreatePermissionOnClusterToReadFromNonExistentTopic STARTED

kafka.api.GroupAuthorizerIntegrationTest > 
testCreatePermissionOnClusterToReadFromNonExistentTopic PASSED

kafka.api.GroupAuthorizerIntegrationTest > 
testUnauthorizedDeleteTopicsWithDescribe STARTED

kafka.api.GroupAuthorizerIntegrationTest > 
testUnauthorizedDeleteTopicsWithDescribe PASSED

kafka.api.GroupAuthorizerIntegrationTest > 
shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException STARTED

kafka.api.GroupAuthorizerIntegrationTest > 
shouldSuccessfullyAbortTransactionAfterTopicAuthorizationException PASSED

kafka.api.GroupAuthorizerIntegrationTest > 
testSendOffsetsWithNoConsumerGroupWriteAccess STARTED