Re: [VOTE] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-11-03 Thread Anna Povzner
Hi David,

You are right, using a token bucket exposes the metric for the number of
remaining tokens in the bucket. Since we also want to evaluate using a
token bucket for bandwidth & request throttling, it would be better to have
this discussion separately in a separate KIP. This KIP does not implement
token bucket throttling, and I made sure that KIP wiki does not mention it
as well. Hopefully, I haven't added any confusion.

As David Mao is implementing per-IP part of this KIP, he will update the
KIP wiki and notify this thread with a couple of more details that were not
mentioned in the KIP wiki (e.g., per-IP rate metrics exposed when
throttling).

Thanks,
Anna

On Wed, Sep 2, 2020 at 1:06 PM David Jacot  wrote:

> Hi Anna,
>
> Thanks for the update.
>
> If you use Token Bucket, it will expose another metric which reports the
> number of remaining tokens in the bucket, in addition to the current rate
> metric. It would be great to add it in the metrics section of the KIP as
> well
> for completeness.
>
> Best,
> David
>
> On Tue, Aug 11, 2020 at 4:28 AM Anna Povzner  wrote:
>
> > Hi All,
> >
> > I wanted to let everyone know that we would like to make the following
> > changes to the KIP:
> >
> >1.
> >
> >Expose connection acceptance rate metrics (broker-wide and
> per-listener)
> >and per-listener average throttle time metrics for better
> observability
> > and
> >debugging.
> >2.
> >
> >KIP-599 introduced a new implementation of MeasurableStat that
> >implements a token bucket, which improves rate throttling for bursty
> >workloads (KAFKA-10162). We would like to use this same mechanism for
> >connection accept rate throttling.
> >
> >
> > I updated the KIP to reflect these changes.
> >
> > Let me know if you have any concerns.
> >
> > Thanks,
> >
> > Anna
> >
> >
> > On Thu, May 21, 2020 at 5:42 PM Anna Povzner  wrote:
> >
> > > The vote for KIP-612 has passed with 3 binding and 3 non-binding +1s,
> and
> > > no objections.
> > >
> > >
> > > Thanks everyone for reviews and feedback,
> > >
> > > Anna
> > >
> > > On Tue, May 19, 2020 at 2:41 AM Rajini Sivaram <
> rajinisiva...@gmail.com>
> > > wrote:
> > >
> > >> +1 (binding)
> > >>
> > >> Thanks for the KIP, Anna!
> > >>
> > >> Regards,
> > >>
> > >> Rajini
> > >>
> > >>
> > >> On Tue, May 19, 2020 at 9:32 AM Alexandre Dupriez <
> > >> alexandre.dupr...@gmail.com> wrote:
> > >>
> > >> > +1 (non-binding)
> > >> >
> > >> > Thank you for the KIP!
> > >> >
> > >> >
> > >> > Le mar. 19 mai 2020 à 07:57, David Jacot  a
> > écrit
> > >> :
> > >> > >
> > >> > > +1 (non-binding)
> > >> > >
> > >> > > Thanks for the KIP, Anna!
> > >> > >
> > >> > > On Tue, May 19, 2020 at 7:12 AM Satish Duggana <
> > >> satish.dugg...@gmail.com
> > >> > >
> > >> > > wrote:
> > >> > >
> > >> > > > +1 (non-binding)
> > >> > > > Thanks Anna for the nice feature to control the connection
> > creation
> > >> > rate
> > >> > > > from the clients.
> > >> > > >
> > >> > > > On Tue, May 19, 2020 at 8:16 AM Gwen Shapira  >
> > >> > wrote:
> > >> > > >
> > >> > > > > +1 (binding)
> > >> > > > >
> > >> > > > > Thank you for driving this, Anna
> > >> > > > >
> > >> > > > > On Mon, May 18, 2020 at 4:55 PM Anna Povzner <
> a...@confluent.io
> > >
> > >> > wrote:
> > >> > > > >
> > >> > > > > > Hi All,
> > >> > > > > >
> > >> > > > > > I would like to start the vote on KIP-612: Ability to limit
> > >> > connection
> > >> > > > > > creation rate on brokers.
> > >> > > > > >
> > >> > > > > > For reference, here is the KIP wiki:
> > >> > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers
> > >> > > > > >
> > >> > > > > > And discussion thread:
> > >> > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> >
> > >>
> >
> https://lists.apache.org/thread.html/r61162661fa307d0bc5c8326818bf223a689c49e1c828c9928ee26969%40%3Cdev.kafka.apache.org%3E
> > >> > > > > >
> > >> > > > > > Thanks,
> > >> > > > > >
> > >> > > > > > Anna
> > >> > > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > --
> > >> > > > > Gwen Shapira
> > >> > > > > Engineering Manager | Confluent
> > >> > > > > 650.450.2760 | @gwenshap
> > >> > > > > Follow us: Twitter | blog
> > >> > > > >
> > >> > > >
> > >> >
> > >>
> > >
> >
>


Re: [DISCUSS] Apache Kafka 2.7.0 release

2020-10-06 Thread Anna Povzner
Hi Bill,

Regarding KIP-612, only the first half of the KIP will get into 2.7
release: Broker-wide and per-listener connection rate limits, including
corresponding configs and metric (KAFKA-10023). I see that the table in the
release plan tags KAFKA-10023 as "old", not sure what it refers to. Note
that while KIP-612 was approved prior to 2.6 release, none of the
implementation went into 2.6 release.

The second half of the KIP that adds per-IP connection rate limiting will
need to be postponed (KAFKA-10024) till the following release.

Thanks,
Anna

On Tue, Oct 6, 2020 at 2:30 PM Bill Bejeck  wrote:

> Hi Kowshik,
>
> Given that the new feature is contained in the PR and the tooling is
> follow-on work (minor work, but that's part of the submitted PR), I think
> this is fine.
>
> Thanks,
> BIll
>
> On Tue, Oct 6, 2020 at 5:00 PM Kowshik Prakasam 
> wrote:
>
> > Hey Bill,
> >
> > For KIP-584 , we are in
> the
> > process of reviewing/merging the write path PR into AK trunk:
> > https://github.com/apache/kafka/pull/9001 . As far as the KIP goes, this
> > PR
> > is a major milestone. The PR merge will hopefully be done before EOD
> > tomorrow in time for the feature freeze. Beyond this PR, couple things
> are
> > left to be completed for this KIP: (1) tooling support and (2)
> implementing
> > support for feature version deprecation in the broker . In particular,
> (1)
> > is important for this KIP and the code changes are external to the broker
> > (since it is a separate tool we intend to build). As of now, we won't be
> > able to merge the tooling changes before feature freeze date. Would it be
> > ok to merge the tooling changes before code freeze on 10/22? The tooling
> > requirements are explained here:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-584
> > 
> >
> >
> %3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-Toolingsupport
> >
> > I would love to hear thoughts from Boyang and Jun as well.
> >
> >
> > Thanks,
> > Kowshik
> >
> >
> >
> > On Mon, Oct 5, 2020 at 3:29 PM Bill Bejeck  wrote:
> >
> > > Hi John,
> > >
> > > I've updated the list of expected KIPs for 2.7.0 with KIP-478.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Mon, Oct 5, 2020 at 11:26 AM John Roesler 
> > wrote:
> > >
> > > > Hi Bill,
> > > >
> > > > Sorry about this, but I've just noticed that KIP-478 is
> > > > missing from the list. The url is:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
> > > >
> > > > The KIP was accepted a long time ago, and the implementation
> > > > has been trickling in since 2.6 branch cut. However, most of
> > > > the public API implementation is done now, so I think at
> > > > this point, we can call it "released in 2.7.0". I'll make
> > > > sure it's done by feature freeze.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Thu, 2020-10-01 at 13:49 -0400, Bill Bejeck wrote:
> > > > > All,
> > > > >
> > > > > With the KIP acceptance deadline passing yesterday, I've updated
> the
> > > > > planned KIP content section of the 2.7.0 release plan
> > > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158872629
> > > > >
> > > > > .
> > > > >
> > > > > Removed proposed KIPs for 2.7.0 not getting approval
> > > > >
> > > > >1. KIP-653
> > > > ><
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+Upgrade+log4j+to+log4j2
> > > > >
> > > > >2. KIP-608
> > > > ><
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-608+-+Expose+Kafka+Metrics+in+Authorizer
> > > > >
> > > > >3. KIP-508
> > > > ><
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-508%3A+Make+Suppression+State+Queriable
> > > > >
> > > > >
> > > > > KIPs added
> > > > >
> > > > >1. KIP-671
> > > > ><
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Introduce+Kafka+Streams+Specific+Uncaught+Exception+Handler
> > > > >
> > > > >
> > > > >
> > > > > Please let me know if I've missed anything.
> > > > >
> > > > > Thanks,
> > > > > Bill
> > > > >
> > > > > On Thu, Sep 24, 2020 at 1:47 PM Bill Bejeck 
> > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > Just a reminder that the KIP freeze is next Wednesday, September
> > > 30th.
> > > > > > Any KIP aiming to go in the 2.7.0 release needs to be accepted by
> > > this
> > > > date.
> > > > > >
> > > > > > Thanks,
> > > > > > BIll
> > > > > >
> > > > > > On Tue, Sep 22, 2020 at 12:11 PM Bill Bejeck 
> > > > wrote:
> > > > > >
> > > > > > > Boyan,
> > > > > > >
> > > > > > > Done. Thanks for the heads up.
> > > > > > >
> > > > > > > -Bill
> > > > > > >
> > > > > > > On Mon, Sep 21, 2020 at 6:36 PM Boyang Chen <
> > > > reluctanthero...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Bill,
> > > > > > 

[jira] [Created] (KAFKA-10509) Add metric to track throttle time due to hitting connection rate quota

2020-09-21 Thread Anna Povzner (Jira)
Anna Povzner created KAFKA-10509:


 Summary: Add metric to track throttle time due to hitting 
connection rate quota
 Key: KAFKA-10509
 URL: https://issues.apache.org/jira/browse/KAFKA-10509
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Anna Povzner
Assignee: Anna Povzner
 Fix For: 2.7.0


See KIP-612.

 

kafka.network:type=socket-server-metrics,name=connection-accept-throttle-time,listener=\{listenerName}
 * Type: SampledStat.Avg
 * Description: Average throttle time due to violating per-listener or 
broker-wide connection acceptance rate quota on a given listener.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10458) Need a way to update quota for TokenBucket registered with Sensor

2020-09-21 Thread Anna Povzner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner resolved KAFKA-10458.
--
Resolution: Fixed

> Need a way to update quota for TokenBucket registered with Sensor
> -
>
> Key: KAFKA-10458
> URL: https://issues.apache.org/jira/browse/KAFKA-10458
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>    Reporter: Anna Povzner
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7.0
>
>
> For Rate() metric with quota config, we update quota by updating config of 
> KafkaMetric. However, it is not enough for TokenBucket, because it uses quota 
> config on record() to properly calculate the number of tokens. Sensor passes 
> config stored in the corresponding StatAndConfig, which currently never 
> changes. This means that after updating quota via KafkaMetric.config, our 
> current and only method, Sensor will record the value using old quota but 
> then measure the value to check for quota violation using the new quota 
> value. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10458) Need a way to update quota for TokenBucket registered with Sensor

2020-09-02 Thread Anna Povzner (Jira)
Anna Povzner created KAFKA-10458:


 Summary: Need a way to update quota for TokenBucket registered 
with Sensor
 Key: KAFKA-10458
 URL: https://issues.apache.org/jira/browse/KAFKA-10458
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Anna Povzner
Assignee: Anna Povzner
 Fix For: 2.7.0


For Rate() metric with quota config, we update quota by updating config of 
KafkaMetric. However, it is not enough for TokenBucket, because it uses quota 
config on record() to properly calculate the number of tokens. Sensor passes 
config stored in the corresponding StatAndConfig, which currently never 
changes. This means that after updating quota via KafkaMetric.config, our 
current and only method, Sensor will record the value using old quota but then 
measure the value to check for quota violation using the new quota value. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-08-10 Thread Anna Povzner
Hi All,

I wanted to let everyone know that we would like to make the following
changes to the KIP:

   1.

   Expose connection acceptance rate metrics (broker-wide and per-listener)
   and per-listener average throttle time metrics for better observability and
   debugging.
   2.

   KIP-599 introduced a new implementation of MeasurableStat that
   implements a token bucket, which improves rate throttling for bursty
   workloads (KAFKA-10162). We would like to use this same mechanism for
   connection accept rate throttling.


I updated the KIP to reflect these changes.

Let me know if you have any concerns.

Thanks,

Anna


On Thu, May 21, 2020 at 5:42 PM Anna Povzner  wrote:

> The vote for KIP-612 has passed with 3 binding and 3 non-binding +1s, and
> no objections.
>
>
> Thanks everyone for reviews and feedback,
>
> Anna
>
> On Tue, May 19, 2020 at 2:41 AM Rajini Sivaram 
> wrote:
>
>> +1 (binding)
>>
>> Thanks for the KIP, Anna!
>>
>> Regards,
>>
>> Rajini
>>
>>
>> On Tue, May 19, 2020 at 9:32 AM Alexandre Dupriez <
>> alexandre.dupr...@gmail.com> wrote:
>>
>> > +1 (non-binding)
>> >
>> > Thank you for the KIP!
>> >
>> >
>> > Le mar. 19 mai 2020 à 07:57, David Jacot  a écrit
>> :
>> > >
>> > > +1 (non-binding)
>> > >
>> > > Thanks for the KIP, Anna!
>> > >
>> > > On Tue, May 19, 2020 at 7:12 AM Satish Duggana <
>> satish.dugg...@gmail.com
>> > >
>> > > wrote:
>> > >
>> > > > +1 (non-binding)
>> > > > Thanks Anna for the nice feature to control the connection creation
>> > rate
>> > > > from the clients.
>> > > >
>> > > > On Tue, May 19, 2020 at 8:16 AM Gwen Shapira 
>> > wrote:
>> > > >
>> > > > > +1 (binding)
>> > > > >
>> > > > > Thank you for driving this, Anna
>> > > > >
>> > > > > On Mon, May 18, 2020 at 4:55 PM Anna Povzner 
>> > wrote:
>> > > > >
>> > > > > > Hi All,
>> > > > > >
>> > > > > > I would like to start the vote on KIP-612: Ability to limit
>> > connection
>> > > > > > creation rate on brokers.
>> > > > > >
>> > > > > > For reference, here is the KIP wiki:
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers
>> > > > > >
>> > > > > > And discussion thread:
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> >
>> https://lists.apache.org/thread.html/r61162661fa307d0bc5c8326818bf223a689c49e1c828c9928ee26969%40%3Cdev.kafka.apache.org%3E
>> > > > > >
>> > > > > > Thanks,
>> > > > > >
>> > > > > > Anna
>> > > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > Gwen Shapira
>> > > > > Engineering Manager | Confluent
>> > > > > 650.450.2760 | @gwenshap
>> > > > > Follow us: Twitter | blog
>> > > > >
>> > > >
>> >
>>
>


[jira] [Created] (KAFKA-10157) Multiple tests failed due to "Failed to process feature ZK node change event"

2020-06-11 Thread Anna Povzner (Jira)
Anna Povzner created KAFKA-10157:


 Summary: Multiple tests failed due to "Failed to process feature 
ZK node change event"
 Key: KAFKA-10157
 URL: https://issues.apache.org/jira/browse/KAFKA-10157
 Project: Kafka
  Issue Type: Bug
Reporter: Anna Povzner


Multiple tests failed due to "Failed to process feature ZK node change event". 
Looks like a result of merge of this PR: 
[https://github.com/apache/kafka/pull/8680]

Note that running tests without `--info` gives output like this one: 
{quote}Process 'Gradle Test Executor 36' finished with non-zero exit value 1
{quote}
kafka.network.DynamicConnectionQuotaTest failed:
{quote}
kafka.network.DynamicConnectionQuotaTest > testDynamicConnectionQuota 
STANDARD_OUT
 [2020-06-11 20:52:42,596] ERROR [feature-zk-node-event-process-thread]: Failed 
to process feature ZK node change event. The broker will eventually exit. 
(kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76)
 java.lang.InterruptedException
 at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056)
 at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090)
 at 
java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433)
 at 
kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147){quote}
 

kafka.api.CustomQuotaCallbackTest failed:

{quote}    [2020-06-11 21:07:36,745] ERROR 
[feature-zk-node-event-process-thread]: Failed to process feature ZK node 
change event. The broker will eventually exit. 
(kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76)

    java.lang.InterruptedException

        at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056)

        at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090)

        at 
java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433)

        at 
kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147)

        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)

        at scala.util.control.Exception$Catch.apply(Exception.scala:227)

        at 
kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
 at scala.util.control.Exception$Catch.apply(Exception.scala:227)
 at 
kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
{quote}
 

kafka.server.DynamicBrokerReconfigurationTest failed:

{quote}    [2020-06-11 21:13:01,207] ERROR 
[feature-zk-node-event-process-thread]: Failed to process feature ZK node 
change event. The broker will eventually exit. 
(kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread:76)

    java.lang.InterruptedException

        at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056)

        at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2090)

        at 
java.base/java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:433)

        at 
kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.$anonfun$doWork$1(FinalizedFeatureChangeListener.scala:147)

        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)

        at scala.util.control.Exception$Catch.apply(Exception.scala:227)

        at 
kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread.doWork(FinalizedFeatureChangeListener.scala:147)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
{quote}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [VOTE] KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-05 Thread Anna Povzner
+1 (not binding)

Thanks for the KIP!

-Anna

On Thu, Jun 4, 2020 at 8:26 AM Mickael Maison 
wrote:

> +1 (binding)
> Thanks David for looking into this important issue
>
> On Thu, Jun 4, 2020 at 3:59 PM Tom Bentley  wrote:
> >
> > +1 (non binding).
> >
> > Thanks!
> >
> > On Wed, Jun 3, 2020 at 3:51 PM Rajini Sivaram 
> > wrote:
> >
> > > +1 (binding)
> > >
> > > Thanks for the KIP, David!
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > >
> > > On Sun, May 31, 2020 at 3:29 AM Gwen Shapira 
> wrote:
> > >
> > > > +1 (binding)
> > > >
> > > > Looks great. Thank you for the in-depth design and discussion.
> > > >
> > > > On Fri, May 29, 2020 at 7:58 AM David Jacot 
> wrote:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > I'd like to start the vote for KIP-599 which proposes a new quota
> to
> > > > > throttle create topic, create partition, and delete topics
> operations
> > > to
> > > > > protect the Kafka controller:
> > > > >
> > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-599%3A+Throttle+Create+Topic%2C+Create+Partition+and+Delete+Topic+Operations
> > > > >
> > > > > Please, let me know what you think.
> > > > >
> > > > > Cheers,
> > > > > David
> > > > >
> > > >
> > > >
> > > > --
> > > > Gwen Shapira
> > > > Engineering Manager | Confluent
> > > > 650.450.2760 | @gwenshap
> > > > Follow us: Twitter | blog
> > > >
> > >
>


Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-05 Thread Anna Povzner
Hi David,

The KIP looks good to me. I am going to the voting thread...

Hi Jun,

Yes, exactly. That's a separate thing from this KIP, so working on the fix.

Thanks,
Anna

On Fri, Jun 5, 2020 at 4:36 PM Jun Rao  wrote:

> Hi, Anna,
>
> Thanks for the comment. For the problem that you described, perhaps we need
> to make the quota checking and recording more atomic?
>
> Hi, David,
>
> Thanks for the updated KIP.  Looks good to me now. Just one minor comment
> below.
>
> 30. controller_mutations_rate: For topic creation and deletion, is the rate
> accumulated at the topic or partition level? It would be useful to make it
> clear in the wiki.
>
> Jun
>
> On Fri, Jun 5, 2020 at 7:23 AM David Jacot  wrote:
>
> > Hi Anna and Jun,
> >
> > You are right. We should allocate up to the quota for each old sample.
> >
> > I have revamped the Throttling Algorithm section to better explain our
> > thought process and the token bucket inspiration.
> >
> > I have also added a chapter with few guidelines about how to define
> > the quota. There is no magic formula for this but I give few insights.
> > I don't have specific numbers that can be used out of the box so I
> > think that it is better to not put any for the time being. We can always
> > complement later on in the documentation.
> >
> > Please, take a look and let me know what you think.
> >
> > Cheers,
> > David
> >
> > On Fri, Jun 5, 2020 at 8:37 AM Anna Povzner  wrote:
> >
> > > Hi David and Jun,
> > >
> > > I dug a bit deeper into the Rate implementation, and wanted to confirm
> > that
> > > I do believe that the token bucket behavior is better for the reasons
> we
> > > already discussed but wanted to summarize. The main difference between
> > Rate
> > > and token bucket is that the Rate implementation allows a burst by
> > > borrowing from the future, whereas a token bucket allows a burst by
> using
> > > accumulated tokens from the previous idle period. Using accumulated
> > tokens
> > > smoothes out the rate measurement in general. Configuring a large burst
> > > requires configuring a large quota window, which causes long delays for
> > > bursty workload, due to borrowing credits from the future. Perhaps it
> is
> > > useful to add a summary in the beginning of the Throttling Algorithm
> > > section?
> > >
> > > In my previous email, I mentioned the issue we observed with the
> > bandwidth
> > > quota, where a low quota (1MB/s per broker) was limiting bandwidth
> > visibly
> > > below the quota. I thought it was strictly the issue with the Rate
> > > implementation as well, but I found a root cause to be different but
> > > amplified by the Rate implementation (long throttle delays of requests
> > in a
> > > burst). I will describe it here for completeness using the following
> > > example:
> > >
> > >-
> > >
> > >Quota = 1MB/s, default window size and number of samples
> > >-
> > >
> > >Suppose there are 6 connections (maximum 6 outstanding requests),
> and
> > >each produce request is 5MB. If all requests arrive in a burst, the
> > > last 4
> > >requests (20MB over 10MB allowed in a window) may get the same
> > throttle
> > >time if they are processed concurrently. We record the rate under
> the
> > > lock,
> > >but then calculate throttle time separately after that. So, for each
> > >request, the observed rate could be 3MB/s, and each request gets
> > > throttle
> > >delay = 20 seconds (instead of 5, 10, 15, 20 respectively). The
> delay
> > is
> > >longer than the total rate window, which results in lower bandwidth
> > than
> > >the quota. Since all requests got the same delay, they will also
> > arrive
> > > in
> > >a burst, which may also result in longer delay than necessary. It
> > looks
> > >pretty easy to fix, so I will open a separate JIRA for it. This can
> be
> > >additionally mitigated by token bucket behavior.
> > >
> > >
> > > For the algorithm "So instead of having one sample equal to 560 in the
> > last
> > > window, we will have 100 samples equal to 5.6.", I agree with Jun. I
> > would
> > > allocate 5 per each old sample that is still in the overall window. It
> > > would be a bit larger granularity than the pure token bucket (we lose 5
> > > units / muta

Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-05 Thread Anna Povzner
> was set.
> > >
> > > On the other hand, if validateOnly is really about validating only some
> > > aspects of the request (which maybe is what the name implies), then we
> > > should clarify in the Javadoc that the quota is not included in the
> > > validation.
> > >
> > > On balance, I agree with what you're proposing, since the extra utility
> > of
> > > including the quota in the validation seems to be not worth the extra
> > > complication for the retry.
> > >
> > > Thanks,
> > >
> > > Tom
> > >
> > >
> > >
> > > On Thu, Jun 4, 2020 at 3:32 PM David Jacot 
> wrote:
> > >
> > > > Hi Tom,
> > > >
> > > > That's a good question. As the validation does not create any load on
> > the
> > > > controller, I was planning to do it without checking the quota at
> all.
> > > Does
> > > > that
> > > > sound reasonable?
> > > >
> > > > Best,
> > > > David
> > > >
> > > > On Thu, Jun 4, 2020 at 4:23 PM David Jacot 
> > wrote:
> > > >
> > > > > Hi Jun and Anna,
> > > > >
> > > > > Thank you both for your replies.
> > > > >
> > > > > Based on our recent discussion, I agree that using a rate is better
> > to
> > > > > remain
> > > > > consistent with other quotas. As you both suggested, it seems that
> > > > changing
> > > > > the way we compute the rate to better handle spiky workloads and
> > > behave a
> > > > > bit more similarly to the token bucket algorithm makes sense for
> all
> > > > > quotas as
> > > > > well.
> > > > >
> > > > > I will update the KIP to reflect this.
> > > > >
> > > > > Anna, I think that we can explain this in this KIP. We can't just
> say
> > > > that
> > > > > the Rate
> > > > > will be updated in this KIP. I think that we need to give a bit
> more
> > > > info.
> > > > >
> > > > > Best,
> > > > > David
> > > > >
> > > > > On Thu, Jun 4, 2020 at 6:31 AM Anna Povzner 
> > wrote:
> > > > >
> > > > >> Hi Jun and David,
> > > > >>
> > > > >> Regarding token bucket vs, Rate behavior. We recently observed a
> > > couple
> > > > of
> > > > >> cases where a bursty workload behavior would result in long-ish
> > pauses
> > > > in
> > > > >> between, resulting in lower overall bandwidth than the quota. I
> will
> > > > need
> > > > >> to debug this a bit more to be 100% sure, but it does look like
> the
> > > case
> > > > >> described by David earlier in this thread. So, I agree with Jun
> -- I
> > > > think
> > > > >> we should make all quota rate behavior consistent, and probably
> > > similar
> > > > to
> > > > >> the token bucket one.
> > > > >>
> > > > >> Looking at KIP-13, it doesn't describe rate calculation in enough
> > > > detail,
> > > > >> but does mention window size. So, we could keep "window size" and
> > > > "number
> > > > >> of samples" configs and change Rate implementation to be more
> > similar
> > > to
> > > > >> token bucket:
> > > > >> * number of samples define our burst size
> > > > >> * Change the behavior, which could be described as: If a burst
> > happens
> > > > >> after an idle period, the burst would effectively spread evenly
> over
> > > the
> > > > >> (now - window) time period, where window is ( -
> > 1)*
> > > > >> . Which basically describes a token bucket, while
> > keeping
> > > > the
> > > > >> current quota configs. I think we can even implement this by
> > changing
> > > > the
> > > > >> way we record the last sample or lastWindowMs.
> > > > >>
> > > > >> Jun, if we would be changing Rate calculation behavior in
> bandwidth
> > > and
> > > > >> request quotas, would we need a separate KIP? Shouldn't need to if
> > we
> > > > >> keep windo

Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-06-03 Thread Anna Povzner
Hi Jun and David,

Regarding token bucket vs, Rate behavior. We recently observed a couple of
cases where a bursty workload behavior would result in long-ish pauses in
between, resulting in lower overall bandwidth than the quota. I will need
to debug this a bit more to be 100% sure, but it does look like the case
described by David earlier in this thread. So, I agree with Jun -- I think
we should make all quota rate behavior consistent, and probably similar to
the token bucket one.

Looking at KIP-13, it doesn't describe rate calculation in enough detail,
but does mention window size. So, we could keep "window size" and "number
of samples" configs and change Rate implementation to be more similar to
token bucket:
* number of samples define our burst size
* Change the behavior, which could be described as: If a burst happens
after an idle period, the burst would effectively spread evenly over the
(now - window) time period, where window is ( - 1)*
. Which basically describes a token bucket, while keeping the
current quota configs. I think we can even implement this by changing the
way we record the last sample or lastWindowMs.

Jun, if we would be changing Rate calculation behavior in bandwidth and
request quotas, would we need a separate KIP? Shouldn't need to if we
keep window size and number of samples configs, right?

Thanks,
Anna

On Wed, Jun 3, 2020 at 3:24 PM Jun Rao  wrote:

> Hi, David,
>
> Thanks for the reply.
>
> 11. To match the behavior in the Token bucket approach, I was thinking that
> requests that don't fit in the previous time windows will be accumulated in
> the current time window. So, the 60 extra requests will be accumulated in
> the latest window. Then, the client also has to wait for 12 more secs
> before throttling is removed. I agree that this is probably a better
> behavior and it's reasonable to change the existing behavior to this one.
>
> To me, it seems that sample_size * num_windows is the same as max burst
> balance. The latter seems a bit better to configure. The thing is that the
> existing quota system has already been used in quite a few places and if we
> want to change the configuration in the future, there is the migration
> cost. Given that, do you feel it's better to adopt the  new token bucket
> terminology or just adopt the behavior somehow into our existing system? If
> it's the former, it would be useful to document this in the rejected
> section and add a future plan on migrating existing quota configurations.
>
> Jun
>
>
> On Tue, Jun 2, 2020 at 6:55 AM David Jacot  wrote:
>
> > Hi Jun,
> >
> > Thanks for your reply.
> >
> > 10. I think that both options are likely equivalent from an accuracy
> point
> > of
> > view. If we put the implementation aside, conceptually, I am not
> convinced
> > by the used based approach because resources don't have a clear owner
> > in AK at the moment. A topic can be created by (Principal A, no client
> id),
> > then can be extended by (no principal, Client B), and finally deleted by
> > (Principal C, Client C). This does not sound right to me and I fear that
> it
> > is not going to be easy to grasp for our users.
> >
> > Regarding the naming, I do agree that we can make it more future proof.
> > I propose `controller_mutations_rate`. I think that differentiating the
> > mutations
> > from the reads is still a good thing for the future.
> >
> > 11. I am not convinced by your proposal for the following reasons:
> >
> > First, in my toy example, I used 101 windows and 7 * 80 requests. We
> could
> > effectively allocate 5 * 100 requests to the previous windows assuming
> that
> > they are empty. What shall we do with the remaining 60 requests? Shall we
> > allocate them to the current window or shall we divide it among all the
> > windows?
> >
> > Second, I don't think that we can safely change the behavior of all the
> > existing
> > rates used because it actually changes the computation of the rate as
> > values
> > allocated to past windows would expire before they would today.
> >
> > Overall, while trying to fit in the current rate, we are going to build a
> > slightly
> > different version of the rate which will be even more confusing for
> users.
> >
> > Instead, I think that we should embrace the notion of burst as it could
> > also
> > be applied to other quotas in the future. Users don't have to know that
> we
> > use the Token Bucket or a special rate inside at the end of the day. It
> is
> > an
> > implementation detail.
> >
> > Users would be able to define:
> > - a rate R; and
> > - a maximum burst B.
> >
> > If we change the metrics to be as follow:
> > - the actual rate;
> > - the burst balance in %, 0 meaning that the user is throttled;
> > It remains disattach from the algorithm.
> >
> > I personally prefer this over having to define a rate and a number of
> > windows
> > while having to understand that the number of windows implicitly defines
> > the
> > allowed burst. I think that it is clearer and easier to 

Re: [VOTE] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-21 Thread Anna Povzner
The vote for KIP-612 has passed with 3 binding and 3 non-binding +1s, and
no objections.


Thanks everyone for reviews and feedback,

Anna

On Tue, May 19, 2020 at 2:41 AM Rajini Sivaram 
wrote:

> +1 (binding)
>
> Thanks for the KIP, Anna!
>
> Regards,
>
> Rajini
>
>
> On Tue, May 19, 2020 at 9:32 AM Alexandre Dupriez <
> alexandre.dupr...@gmail.com> wrote:
>
> > +1 (non-binding)
> >
> > Thank you for the KIP!
> >
> >
> > Le mar. 19 mai 2020 à 07:57, David Jacot  a écrit :
> > >
> > > +1 (non-binding)
> > >
> > > Thanks for the KIP, Anna!
> > >
> > > On Tue, May 19, 2020 at 7:12 AM Satish Duggana <
> satish.dugg...@gmail.com
> > >
> > > wrote:
> > >
> > > > +1 (non-binding)
> > > > Thanks Anna for the nice feature to control the connection creation
> > rate
> > > > from the clients.
> > > >
> > > > On Tue, May 19, 2020 at 8:16 AM Gwen Shapira 
> > wrote:
> > > >
> > > > > +1 (binding)
> > > > >
> > > > > Thank you for driving this, Anna
> > > > >
> > > > > On Mon, May 18, 2020 at 4:55 PM Anna Povzner 
> > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I would like to start the vote on KIP-612: Ability to limit
> > connection
> > > > > > creation rate on brokers.
> > > > > >
> > > > > > For reference, here is the KIP wiki:
> > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers
> > > > > >
> > > > > > And discussion thread:
> > > > > >
> > > > > >
> > > > >
> > > >
> >
> https://lists.apache.org/thread.html/r61162661fa307d0bc5c8326818bf223a689c49e1c828c9928ee26969%40%3Cdev.kafka.apache.org%3E
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Anna
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Gwen Shapira
> > > > > Engineering Manager | Confluent
> > > > > 650.450.2760 | @gwenshap
> > > > > Follow us: Twitter | blog
> > > > >
> > > >
> >
>


[jira] [Created] (KAFKA-10024) Add dynamic configuration and enforce quota for per-IP connection rate limits

2020-05-19 Thread Anna Povzner (Jira)
Anna Povzner created KAFKA-10024:


 Summary: Add dynamic configuration and enforce quota for per-IP 
connection rate limits
 Key: KAFKA-10024
 URL: https://issues.apache.org/jira/browse/KAFKA-10024
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Anna Povzner
Assignee: Anna Povzner


This JIRA is for the second part of KIP-612 – Add per-IP connection creation 
rate limits.

As described here: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10023) Enforce broker-wide and per-listener connection creation rate (KIP-612, part 1)

2020-05-19 Thread Anna Povzner (Jira)
Anna Povzner created KAFKA-10023:


 Summary: Enforce broker-wide and per-listener connection creation 
rate (KIP-612, part 1)
 Key: KAFKA-10023
 URL: https://issues.apache.org/jira/browse/KAFKA-10023
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Anna Povzner
Assignee: Anna Povzner


This JIRA is for the first part of KIP-612 – Add an ability to configure and 
enforce broker-wide and per-listener connection creation rate. 

As described here: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[VOTE] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-18 Thread Anna Povzner
Hi All,

I would like to start the vote on KIP-612: Ability to limit connection
creation rate on brokers.

For reference, here is the KIP wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers

And discussion thread:
https://lists.apache.org/thread.html/r61162661fa307d0bc5c8326818bf223a689c49e1c828c9928ee26969%40%3Cdev.kafka.apache.org%3E

Thanks,

Anna


Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-18 Thread Anna Povzner
I realized the KIP freeze is on May 20. I will start the voting thread now.

On Mon, May 18, 2020 at 3:19 PM Anna Povzner  wrote:

> Thanks everyone for the feedback. I will start a voting thread tomorrow
> morning if there are no more comments.
>
> Regards,
> Anna
>
> On Mon, May 18, 2020 at 2:06 PM Anna Povzner  wrote:
>
>> Hi Boyang,
>>
>> This KIP does not change the protocol with clients. The behavior is the
>> same as with KIP-402 where the broker delays accepting new connections when
>> the limit for the number of connections is reached. This KIP adds another
>> reason for the delay (when the rate is reached). Similarly, when dropping a
>> connection when per-IP limit is reached, except this KIP delays the
>> response or may still accept the connection. Client may timeout on waiting
>> for connection, and retry.
>>
>> Thanks,
>> Anna
>>
>> On Mon, May 18, 2020 at 12:54 PM Boyang Chen 
>> wrote:
>>
>>> Hey Anna,
>>>
>>> thanks for the KIP. Will this change be applied as one type of quota
>>> violation, which for client side should be retriable? For EOS model
>>> before
>>> 2.6, the Streams client creates one producer for each input partition, so
>>> it is actually possible to create thousands of producers when the service
>>> is up. Just want to clarify what's the expected behavior to be seen on
>>> the
>>> client side?
>>>
>>> On Mon, May 18, 2020 at 12:04 PM Anna Povzner  wrote:
>>>
>>> > Hi Alexandre,
>>> >
>>> > Thanks for your comments. My answers are below:
>>> >
>>> > 900. The KIP does not propose any new metrics because we already have
>>> > metrics that will let us monitor connection attempts and the amount of
>>> time
>>> > the broker delays accepting new connections:
>>> > 1. We have a per-listener (and per-processor) metric for connection
>>> > creation rate:
>>> >
>>> >
>>> kafka.server:type=socket-server-metrics,listener={listener_name},networkProcessor={#},name=connection-creation-rate
>>> > 2. We have per-listener metrics that track the amount of time Acceptor
>>> is
>>> > blocked from accepting connections:
>>> >
>>> >
>>> kafka.network:type=Acceptor,name=AcceptorBlockedPercent,listener={listener_name}
>>> > Note that adding per IP JMX metrics may end up adding a lot of
>>> overhead,
>>> > especially for clusters with a large number of clients and many
>>> different
>>> > IP addresses. If we ever want to add the metric, perhaps we could
>>> propose a
>>> > separate KIP, but that would require some more evaluation of potential
>>> > overhead.
>>> >
>>> > 901. Yes, I updated the wiki with the approach for enforcing per IP
>>> limits
>>> > (not dropping right away), as I described in my response to Rajini.
>>> >
>>> > 902. Any additional stress testing is always super useful. I am going
>>> to
>>> > have PR with the first half of the KIP ready soon (broker-wider and
>>> > per-listener limits). Perhaps it could be worthwhile to see if it makes
>>> > sense to add stress testing to muckrake tests. Also, check out
>>> connection
>>> > stress workloads in Trogdor and whether they are sufficient or could be
>>> > extended:
>>> >
>>> >
>>> https://github.com/apache/kafka/tree/trunk/tools/src/main/java/org/apache/kafka/trogdor/workload
>>> >
>>> > Regards,
>>> > Anna
>>> >
>>> > On Mon, May 18, 2020 at 8:57 AM Rajini Sivaram <
>>> rajinisiva...@gmail.com>
>>> > wrote:
>>> >
>>> > > Hi Anna,
>>> > >
>>> > > Thanks for the response, sounds good.
>>> > >
>>> > > Regards,
>>> > >
>>> > > Rajini
>>> > >
>>> > >
>>> > > On Sun, May 17, 2020 at 1:38 AM Anna Povzner 
>>> wrote:
>>> > >
>>> > > > Hi Rajini,
>>> > > >
>>> > > > Thanks for reviewing the KIP!
>>> > > >
>>> > > > I agree with your suggestion to make per-IP connection rate quota a
>>> > > dynamic
>>> > > > quota for entity name IP. This will allow configuring connection
>>> rate
>>> > > for a
>>> > > > 

Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-18 Thread Anna Povzner
Thanks everyone for the feedback. I will start a voting thread tomorrow
morning if there are no more comments.

Regards,
Anna

On Mon, May 18, 2020 at 2:06 PM Anna Povzner  wrote:

> Hi Boyang,
>
> This KIP does not change the protocol with clients. The behavior is the
> same as with KIP-402 where the broker delays accepting new connections when
> the limit for the number of connections is reached. This KIP adds another
> reason for the delay (when the rate is reached). Similarly, when dropping a
> connection when per-IP limit is reached, except this KIP delays the
> response or may still accept the connection. Client may timeout on waiting
> for connection, and retry.
>
> Thanks,
> Anna
>
> On Mon, May 18, 2020 at 12:54 PM Boyang Chen 
> wrote:
>
>> Hey Anna,
>>
>> thanks for the KIP. Will this change be applied as one type of quota
>> violation, which for client side should be retriable? For EOS model before
>> 2.6, the Streams client creates one producer for each input partition, so
>> it is actually possible to create thousands of producers when the service
>> is up. Just want to clarify what's the expected behavior to be seen on the
>> client side?
>>
>> On Mon, May 18, 2020 at 12:04 PM Anna Povzner  wrote:
>>
>> > Hi Alexandre,
>> >
>> > Thanks for your comments. My answers are below:
>> >
>> > 900. The KIP does not propose any new metrics because we already have
>> > metrics that will let us monitor connection attempts and the amount of
>> time
>> > the broker delays accepting new connections:
>> > 1. We have a per-listener (and per-processor) metric for connection
>> > creation rate:
>> >
>> >
>> kafka.server:type=socket-server-metrics,listener={listener_name},networkProcessor={#},name=connection-creation-rate
>> > 2. We have per-listener metrics that track the amount of time Acceptor
>> is
>> > blocked from accepting connections:
>> >
>> >
>> kafka.network:type=Acceptor,name=AcceptorBlockedPercent,listener={listener_name}
>> > Note that adding per IP JMX metrics may end up adding a lot of overhead,
>> > especially for clusters with a large number of clients and many
>> different
>> > IP addresses. If we ever want to add the metric, perhaps we could
>> propose a
>> > separate KIP, but that would require some more evaluation of potential
>> > overhead.
>> >
>> > 901. Yes, I updated the wiki with the approach for enforcing per IP
>> limits
>> > (not dropping right away), as I described in my response to Rajini.
>> >
>> > 902. Any additional stress testing is always super useful. I am going to
>> > have PR with the first half of the KIP ready soon (broker-wider and
>> > per-listener limits). Perhaps it could be worthwhile to see if it makes
>> > sense to add stress testing to muckrake tests. Also, check out
>> connection
>> > stress workloads in Trogdor and whether they are sufficient or could be
>> > extended:
>> >
>> >
>> https://github.com/apache/kafka/tree/trunk/tools/src/main/java/org/apache/kafka/trogdor/workload
>> >
>> > Regards,
>> > Anna
>> >
>> > On Mon, May 18, 2020 at 8:57 AM Rajini Sivaram > >
>> > wrote:
>> >
>> > > Hi Anna,
>> > >
>> > > Thanks for the response, sounds good.
>> > >
>> > > Regards,
>> > >
>> > > Rajini
>> > >
>> > >
>> > > On Sun, May 17, 2020 at 1:38 AM Anna Povzner 
>> wrote:
>> > >
>> > > > Hi Rajini,
>> > > >
>> > > > Thanks for reviewing the KIP!
>> > > >
>> > > > I agree with your suggestion to make per-IP connection rate quota a
>> > > dynamic
>> > > > quota for entity name IP. This will allow configuring connection
>> rate
>> > > for a
>> > > > particular IP as well. I updated the wiki accordingly.
>> > > >
>> > > > Your second concern makes sense -- rejecting the connection right
>> away
>> > > will
>> > > > likely cause a new connection from the same client. I am concerned
>> > about
>> > > > delaying new connections for processing later, because if the
>> > connections
>> > > > keep coming with the high rate, there may be potentially a large
>> > backlog
>> > > > and connections may start timing out before the broker gets to
>> > 

Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-18 Thread Anna Povzner
Hi Boyang,

This KIP does not change the protocol with clients. The behavior is the
same as with KIP-402 where the broker delays accepting new connections when
the limit for the number of connections is reached. This KIP adds another
reason for the delay (when the rate is reached). Similarly, when dropping a
connection when per-IP limit is reached, except this KIP delays the
response or may still accept the connection. Client may timeout on waiting
for connection, and retry.

Thanks,
Anna

On Mon, May 18, 2020 at 12:54 PM Boyang Chen 
wrote:

> Hey Anna,
>
> thanks for the KIP. Will this change be applied as one type of quota
> violation, which for client side should be retriable? For EOS model before
> 2.6, the Streams client creates one producer for each input partition, so
> it is actually possible to create thousands of producers when the service
> is up. Just want to clarify what's the expected behavior to be seen on the
> client side?
>
> On Mon, May 18, 2020 at 12:04 PM Anna Povzner  wrote:
>
> > Hi Alexandre,
> >
> > Thanks for your comments. My answers are below:
> >
> > 900. The KIP does not propose any new metrics because we already have
> > metrics that will let us monitor connection attempts and the amount of
> time
> > the broker delays accepting new connections:
> > 1. We have a per-listener (and per-processor) metric for connection
> > creation rate:
> >
> >
> kafka.server:type=socket-server-metrics,listener={listener_name},networkProcessor={#},name=connection-creation-rate
> > 2. We have per-listener metrics that track the amount of time Acceptor is
> > blocked from accepting connections:
> >
> >
> kafka.network:type=Acceptor,name=AcceptorBlockedPercent,listener={listener_name}
> > Note that adding per IP JMX metrics may end up adding a lot of overhead,
> > especially for clusters with a large number of clients and many different
> > IP addresses. If we ever want to add the metric, perhaps we could
> propose a
> > separate KIP, but that would require some more evaluation of potential
> > overhead.
> >
> > 901. Yes, I updated the wiki with the approach for enforcing per IP
> limits
> > (not dropping right away), as I described in my response to Rajini.
> >
> > 902. Any additional stress testing is always super useful. I am going to
> > have PR with the first half of the KIP ready soon (broker-wider and
> > per-listener limits). Perhaps it could be worthwhile to see if it makes
> > sense to add stress testing to muckrake tests. Also, check out connection
> > stress workloads in Trogdor and whether they are sufficient or could be
> > extended:
> >
> >
> https://github.com/apache/kafka/tree/trunk/tools/src/main/java/org/apache/kafka/trogdor/workload
> >
> > Regards,
> > Anna
> >
> > On Mon, May 18, 2020 at 8:57 AM Rajini Sivaram 
> > wrote:
> >
> > > Hi Anna,
> > >
> > > Thanks for the response, sounds good.
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > >
> > > On Sun, May 17, 2020 at 1:38 AM Anna Povzner 
> wrote:
> > >
> > > > Hi Rajini,
> > > >
> > > > Thanks for reviewing the KIP!
> > > >
> > > > I agree with your suggestion to make per-IP connection rate quota a
> > > dynamic
> > > > quota for entity name IP. This will allow configuring connection rate
> > > for a
> > > > particular IP as well. I updated the wiki accordingly.
> > > >
> > > > Your second concern makes sense -- rejecting the connection right
> away
> > > will
> > > > likely cause a new connection from the same client. I am concerned
> > about
> > > > delaying new connections for processing later, because if the
> > connections
> > > > keep coming with the high rate, there may be potentially a large
> > backlog
> > > > and connections may start timing out before the broker gets to
> > processing
> > > > them. For example, if clients come through proxy, there may be
> > > > potentially a large number of incoming connections with the same IP.
> > > >
> > > > What do you think about the following option:
> > > > * Once per-IP connection rate reaches the limit, accept or drop
> (clean
> > > up)
> > > > the connection after a delay depending on whether the quota is still
> > > > violated. We could re-use the mechanism implemented with KIP-306
> where
> > > the
> > > > broker delays the response for fail

Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-18 Thread Anna Povzner
Hi Alexandre,

Thanks for your comments. My answers are below:

900. The KIP does not propose any new metrics because we already have
metrics that will let us monitor connection attempts and the amount of time
the broker delays accepting new connections:
1. We have a per-listener (and per-processor) metric for connection
creation rate:
kafka.server:type=socket-server-metrics,listener={listener_name},networkProcessor={#},name=connection-creation-rate
2. We have per-listener metrics that track the amount of time Acceptor is
blocked from accepting connections:
kafka.network:type=Acceptor,name=AcceptorBlockedPercent,listener={listener_name}
Note that adding per IP JMX metrics may end up adding a lot of overhead,
especially for clusters with a large number of clients and many different
IP addresses. If we ever want to add the metric, perhaps we could propose a
separate KIP, but that would require some more evaluation of potential
overhead.

901. Yes, I updated the wiki with the approach for enforcing per IP limits
(not dropping right away), as I described in my response to Rajini.

902. Any additional stress testing is always super useful. I am going to
have PR with the first half of the KIP ready soon (broker-wider and
per-listener limits). Perhaps it could be worthwhile to see if it makes
sense to add stress testing to muckrake tests. Also, check out connection
stress workloads in Trogdor and whether they are sufficient or could be
extended:
https://github.com/apache/kafka/tree/trunk/tools/src/main/java/org/apache/kafka/trogdor/workload

Regards,
Anna

On Mon, May 18, 2020 at 8:57 AM Rajini Sivaram 
wrote:

> Hi Anna,
>
> Thanks for the response, sounds good.
>
> Regards,
>
> Rajini
>
>
> On Sun, May 17, 2020 at 1:38 AM Anna Povzner  wrote:
>
> > Hi Rajini,
> >
> > Thanks for reviewing the KIP!
> >
> > I agree with your suggestion to make per-IP connection rate quota a
> dynamic
> > quota for entity name IP. This will allow configuring connection rate
> for a
> > particular IP as well. I updated the wiki accordingly.
> >
> > Your second concern makes sense -- rejecting the connection right away
> will
> > likely cause a new connection from the same client. I am concerned about
> > delaying new connections for processing later, because if the connections
> > keep coming with the high rate, there may be potentially a large backlog
> > and connections may start timing out before the broker gets to processing
> > them. For example, if clients come through proxy, there may be
> > potentially a large number of incoming connections with the same IP.
> >
> > What do you think about the following option:
> > * Once per-IP connection rate reaches the limit, accept or drop (clean
> up)
> > the connection after a delay depending on whether the quota is still
> > violated. We could re-use the mechanism implemented with KIP-306 where
> the
> > broker delays the response for failed client authentication. The delay
> will
> > be set to min(delay calculated based on the rate quota, 1 second), which
> > matches the max delay for request quota.
> >
> > I think this option is somewhat your suggestion with delaying accepting
> per
> > IP connections that reached the rate limit, but with protection in place
> to
> > make sure the number of delayed connections does not blow up. What do you
> > think?
> >
> > Thanks,
> > Anna
> >
> > On Sat, May 16, 2020 at 1:09 AM Alexandre Dupriez <
> > alexandre.dupr...@gmail.com> wrote:
> >
> > > Hi Anna,
> > >
> > > Thank you for your answers and explanations.
> > >
> > > A couple of additional comments:
> > >
> > > 900. KIP-612 does not intend to dedicate a metric to the throttling of
> > > incoming connections. I wonder if such a metric would be handy for
> > > monitoring and help set-up metric-based alarming if one wishes to
> > > capture this type of incident?
> > >
> > > 901. Following-up on Rajini's point 2 above - from my understanding,
> > > this new quota should prevent excess CPU consumption in
> > > Processor#run() method when a new connection has been accepted.
> > > Through the throttling in place, connections will be delayed as
> > > indicated by the KIP's specifications:
> > >
> > > " If connection creation rate on the broker exceeds the broker-wide
> > > limit, the broker will delay accepting a new connection by an amount
> > > of time that brings the rate within the limit."
> > >
> > > You may be referring to the following sentence:
> > >
> > > "A new broker configuration option wil

Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-16 Thread Anna Povzner
Hi Rajini,

Thanks for reviewing the KIP!

I agree with your suggestion to make per-IP connection rate quota a dynamic
quota for entity name IP. This will allow configuring connection rate for a
particular IP as well. I updated the wiki accordingly.

Your second concern makes sense -- rejecting the connection right away will
likely cause a new connection from the same client. I am concerned about
delaying new connections for processing later, because if the connections
keep coming with the high rate, there may be potentially a large backlog
and connections may start timing out before the broker gets to processing
them. For example, if clients come through proxy, there may be
potentially a large number of incoming connections with the same IP.

What do you think about the following option:
* Once per-IP connection rate reaches the limit, accept or drop (clean up)
the connection after a delay depending on whether the quota is still
violated. We could re-use the mechanism implemented with KIP-306 where the
broker delays the response for failed client authentication. The delay will
be set to min(delay calculated based on the rate quota, 1 second), which
matches the max delay for request quota.

I think this option is somewhat your suggestion with delaying accepting per
IP connections that reached the rate limit, but with protection in place to
make sure the number of delayed connections does not blow up. What do you
think?

Thanks,
Anna

On Sat, May 16, 2020 at 1:09 AM Alexandre Dupriez <
alexandre.dupr...@gmail.com> wrote:

> Hi Anna,
>
> Thank you for your answers and explanations.
>
> A couple of additional comments:
>
> 900. KIP-612 does not intend to dedicate a metric to the throttling of
> incoming connections. I wonder if such a metric would be handy for
> monitoring and help set-up metric-based alarming if one wishes to
> capture this type of incident?
>
> 901. Following-up on Rajini's point 2 above - from my understanding,
> this new quota should prevent excess CPU consumption in
> Processor#run() method when a new connection has been accepted.
> Through the throttling in place, connections will be delayed as
> indicated by the KIP's specifications:
>
> " If connection creation rate on the broker exceeds the broker-wide
> limit, the broker will delay accepting a new connection by an amount
> of time that brings the rate within the limit."
>
> You may be referring to the following sentence:
>
> "A new broker configuration option will be added to limit the rate at
> which connections will be accepted for each IP address. New
> connections for the IP will be dropped once the limit is reached."?
>
> 902. It may be interesting to capture the data with and without
> connection throttling under stress scenarios. You may have these data
> already. If you need a pair of hands to do some stress tests once you
> have a POC or a PR, I am happy to contribute :)
>
> Thanks,
> Alexandre
>
> Le ven. 15 mai 2020 à 12:22, Rajini Sivaram  a
> écrit :
> >
> > Hi Anna,
> >
> > Thanks for the KIP, looks good overall. A couple of comments about per-IP
> > connection quotas:
> >
> > 1) Should we consider making per-IP quota similar to other quotas?
> > Configured as a dynamic quota for entity type IP, with per-IP limit as
> well
> > as defaults? Perhaps that would fit better rather than configs?
> >
> > 2) The current proposal drops connections after accepting connections for
> > per-IP limit. We do this in other cases too, but in this case, should we
> > throttle instead? My point is what is the quota protecting? If we want to
> > limit rate of accepted connections, then accepting a connection and then
> > dropping doesn't really help since that IP is going to reconnect. If we
> > want to rate limit what happens next, i.e. authentication, then
> > throttling the accepted connection so its processing is delayed would
> > perhaps be better?
> >
> > Regards,
> >
> > Rajini
> >
> > On Thu, May 14, 2020 at 4:12 PM David Jacot  wrote:
> >
> > > Hi Anna,
> > >
> > > Thanks for your answers and the updated KIP. Looks good to me!
> > >
> > > Best,
> > > David
> > >
> > > On Thu, May 14, 2020 at 12:54 AM Anna Povzner 
> wrote:
> > >
> > > > I updated the KIP to add a new broker configuration to limit
> connection
> > > > creation rate per IP: max.connection.creation.rate.per.ip. Once the
> limit
> > > > is reached for a particular IP address, the broker will reject the
> > > > connection from that IP (close the connection it accepted) and
> continue
> > > > rejecting them until the rate is back

Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-13 Thread Anna Povzner
I updated the KIP to add a new broker configuration to limit connection
creation rate per IP: max.connection.creation.rate.per.ip. Once the limit
is reached for a particular IP address, the broker will reject the
connection from that IP (close the connection it accepted) and continue
rejecting them until the rate is back within the rate limit.

On Wed, May 13, 2020 at 11:46 AM Anna Povzner  wrote:

> Hi David and Alexandre,
>
> Thanks so much for your feedback! Here are my answers:
>
> 1. Yes, we have seen several cases of clients that create a new connection
> per produce/consume request. One hypothesis is someone who is used to
> connection pooling may accidentally write a Kafka client that creates a new
> connection every time.
>
> 2 & 4. That's a good point I haven't considered. I think it makes sense to
> provide an ability to limit connection creations per IP as well. This is
> not hard to implement -- the broker already keeps track of the number of
> connections per IP, and immediately closes a new connection if it comes
> from an IP that reached the connection limit. So, we could additionally
> track the rate, and close the connection from IP that exceeds the rate. One
> slight concern is whether keeping track of per IP rates and quotas adds
> overhead (CPU and memory). But perhaps it is not a problem if we use
> expiring sensors.
>
> It would still make sense to limit the overall connection creation rate
> for the Kafka clusters which are shared among many different
> applications/clients, since they may spike at the same time bringing the
> total rate too high.
>
> 3. Controlling connection queue sizes only controls the share of time
> network threads use for creating new connections (and accepting on Acceptor
> thread) vs. doing other work on each Processor iteration. It does not
> directly control how processing connection creations would be related to
> other processing done by brokers like on request handler threads. So, while
> controlling queue size may mitigate the issue for some of the workloads, it
> does not guarantee that. Plus, if we want to limit how many connections are
> created per IP, the queue size approach would not work, unless we go with a
> "share" of the queue, which I think even further obscures what that setting
> means (and what we would achieve as an end result). Does this answer the
> question?
>
> If there are no objections, I will update the KIP to add per IP connection
> rate limits (config and enforcement).
>
> Thanks,
>
> Anna
>
>
> On Tue, May 12, 2020 at 11:25 AM Alexandre Dupriez <
> alexandre.dupr...@gmail.com> wrote:
>
>> Hello,
>>
>> Thank you for the KIP.
>>
>> I experienced in the past genuine broker brownouts due to connection
>> storms consuming most of the CPU available on the server and I think
>> it is useful to protect against it.
>>
>> I tend to share the questions asked in points 2 and 4 from David. Is
>> there still a risk of denial of service if the limit applies at the
>> listener-level without differentiating between (an) “offending”
>> client(s) and the others?
>>
>> To rebound on point 3 - conceptually one difference between capping
>> the queue size or throttling as presented in the KIP would come from
>> the time it takes to accept a connection and how that time evolves
>> with the connection rate.
>> Assuming that that time increases monotonically with resource
>> utilization, the admissible rate of connections would decrease as the
>> server becomes more loaded, if the limit was set on queue size.
>>
>> Thanks,
>> Alexandre
>>
>> Le mar. 12 mai 2020 à 08:49, David Jacot  a écrit :
>> >
>> > Hi Anna,
>> >
>> > Thanks for the KIP! I have few questions:
>> >
>> > 1. You mention that some clients may create a new connections for each
>> > requests: "Another example is clients that create a new connection for
>> each
>> > produce/consume request". I am curious here but do we know any clients
>> > behaving like this?
>> >
>> > 2. I am a bit concerned by the impact of misbehaving clients on the
>> other
>> > ones. Let's say that we define a quota of 10 connections / sec for a
>> broker
>> > and that we have a misbehaving application constantly trying to create
>> 20
>> > connections on that broker. That application will constantly hit the
>> quota
>> > and
>> > always have many pending connections in the queue waiting to be
>> accepted.
>> > Regular clients trying to connect would need to wait until all the
>> pending
>> > co

Re: [DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-13 Thread Anna Povzner
Hi David and Alexandre,

Thanks so much for your feedback! Here are my answers:

1. Yes, we have seen several cases of clients that create a new connection
per produce/consume request. One hypothesis is someone who is used to
connection pooling may accidentally write a Kafka client that creates a new
connection every time.

2 & 4. That's a good point I haven't considered. I think it makes sense to
provide an ability to limit connection creations per IP as well. This is
not hard to implement -- the broker already keeps track of the number of
connections per IP, and immediately closes a new connection if it comes
from an IP that reached the connection limit. So, we could additionally
track the rate, and close the connection from IP that exceeds the rate. One
slight concern is whether keeping track of per IP rates and quotas adds
overhead (CPU and memory). But perhaps it is not a problem if we use
expiring sensors.

It would still make sense to limit the overall connection creation rate for
the Kafka clusters which are shared among many different
applications/clients, since they may spike at the same time bringing the
total rate too high.

3. Controlling connection queue sizes only controls the share of time
network threads use for creating new connections (and accepting on Acceptor
thread) vs. doing other work on each Processor iteration. It does not
directly control how processing connection creations would be related to
other processing done by brokers like on request handler threads. So, while
controlling queue size may mitigate the issue for some of the workloads, it
does not guarantee that. Plus, if we want to limit how many connections are
created per IP, the queue size approach would not work, unless we go with a
"share" of the queue, which I think even further obscures what that setting
means (and what we would achieve as an end result). Does this answer the
question?

If there are no objections, I will update the KIP to add per IP connection
rate limits (config and enforcement).

Thanks,

Anna


On Tue, May 12, 2020 at 11:25 AM Alexandre Dupriez <
alexandre.dupr...@gmail.com> wrote:

> Hello,
>
> Thank you for the KIP.
>
> I experienced in the past genuine broker brownouts due to connection
> storms consuming most of the CPU available on the server and I think
> it is useful to protect against it.
>
> I tend to share the questions asked in points 2 and 4 from David. Is
> there still a risk of denial of service if the limit applies at the
> listener-level without differentiating between (an) “offending”
> client(s) and the others?
>
> To rebound on point 3 - conceptually one difference between capping
> the queue size or throttling as presented in the KIP would come from
> the time it takes to accept a connection and how that time evolves
> with the connection rate.
> Assuming that that time increases monotonically with resource
> utilization, the admissible rate of connections would decrease as the
> server becomes more loaded, if the limit was set on queue size.
>
> Thanks,
> Alexandre
>
> Le mar. 12 mai 2020 à 08:49, David Jacot  a écrit :
> >
> > Hi Anna,
> >
> > Thanks for the KIP! I have few questions:
> >
> > 1. You mention that some clients may create a new connections for each
> > requests: "Another example is clients that create a new connection for
> each
> > produce/consume request". I am curious here but do we know any clients
> > behaving like this?
> >
> > 2. I am a bit concerned by the impact of misbehaving clients on the other
> > ones. Let's say that we define a quota of 10 connections / sec for a
> broker
> > and that we have a misbehaving application constantly trying to create 20
> > connections on that broker. That application will constantly hit the
> quota
> > and
> > always have many pending connections in the queue waiting to be accepted.
> > Regular clients trying to connect would need to wait until all the
> pending
> > connections upfront in the queue are drained in the best case scenario or
> > won't be able to connect at all in the worst case scenario if the queue
> is
> > full.
> > Does it sound like a valid concern? How do you see this?
> >
> > 3. As you mention it in the KIP, we use bounded queues which already
> limit
> > the maximum number of connections that can be accepted. I wonder if we
> > could reach the same goal by making the size of the queue configurable.
> >
> > 4. Did you consider doing something similar to the connections quota
> which
> > limits the number of connections per IP? Instead of rate limiting all the
> > creation,
> > we could perhaps rate limit the number of creation per IP as well. That
> > could
> > perhaps r

[DISCUSS] KIP-612: Ability to Limit Connection Creation Rate on Brokers

2020-05-11 Thread Anna Povzner
Hi,

I just created KIP-612 to allow limiting connection creation rate on
brokers, and would like to start a discussion.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers

Feedback and suggestions are welcome!

Thanks,
Anna


Re: KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations

2020-05-06 Thread Anna Povzner
Hi David and Jun,

I wanted to add to the discussion about using requests/sec vs. time on
server threads (similar to request quota) for expressing quota for topic
ops.

I think request quota does not protect the brokers from overload by itself
-- it still requires tuning and sometimes re-tuning, because it depends on
the workload behavior of all users (like a relative share of requests
exempt from throttling). This makes it not that easy to set. Let me give
you more details:

   1.

   The amount of work that the user can get from the request quota depends
   on the load from other users. We measure and enforce user's clock time on
   threads---the time between 2 timestamps, one when the operation starts and
   one when the operation ends. If the user is the only load on the broker, it
   is less likely that their operation will be interrupted by the kernel to
   switch to another thread, and time away from the thread still counts.
   1.

  Pros: this makes it more work-conserving, the user is less limited
  when there are more resources available.
  2.

  Cons: Harder to capacity plan for the user, and could be confusing
  when the broker will suddenly stop supporting the load which it was
  supporting before.
  2.

   For the above reason, it makes most sense to maximize the user's quota
   and set it as a percent of the maximum thread capacity (1100 with default
   broker config).
   3.

   However, the actual maximum threads capacity is not really 1100:
   1.

  Some of it will be taken by requests exempt from throttling, and the
  amount depends on the workload. We have seen cases (somewhat rare) where
  requests exempt from throttling take like ⅔ of the time on threads.
  2.

  We have also seen cases of an overloaded cluster (full queues,
  timeouts, etc) due to high request rate while the time used on
threads was
  way below the max (1100), like 600 or 700 (total exempt + non-exempt
  usage). Basically, when a broker is close to 100% CPU, it takes more and
  more time for the "unaccounted" work like thread getting a chance to pick
  up a request from the queue and get a timestamp.
  4.

   As a result, there will be some tuning to decide on a safe value for
   total thread capacity, from where users can carve out their quotas. Some
   changes in users' workloads may require re-tuning, if, for example, it
   dramatically changes the relative share of non-exempt load.


I think request quota works well for client request load in a sense that it
ensures that different users get a fair/proportional share of resources
during high broker load. If the user cannot get enough resources from their
quota to support their request rate anymore, they can monitor their load
and expand the cluster if needed (or rebalance).

However, I think using time on threads for topic ops could be even more
difficult than simple request rate (as proposed):

   1.

   I understand that we don't only care about topic requests tying up the
   controller thread, but we also care that it does not create a large extra
   load on the cluster due to LeaderAndIsr and other related requests (this is
   more important for small clusters).
   2.

   For that reason, tuning quota in terms of time on threads can be harder,
   because there is no easy way to say how this quota would translate to a
   number of operations (because that would depend on other broker load).


Since tuning would be required anyway, I see the following workflow if we
express controller quota in terms of partition mutations per second:

   1.

   Run topic workload in isolation (the most expensive one, like create
   topic vs. add partitions) and see how much load it adds based on incoming
   rate. Choose quota depending on how much extra load your cluster can
   sustain in addition to its normal load.
   2.

   Could be useful to publish some experimental results to give some
   ballpark numbers to make this sizing easier.


I am interested to see if you agree with the listed assumptions here. I may
have missed something, especially if there is an easier workflow for
setting quota based on time on threads.

Thanks,

Anna


On Thu, Apr 30, 2020 at 8:13 AM Tom Bentley  wrote:

> Hi David,
>
> Thanks for the KIP.
>
> If I understand the proposed throttling algorithm, an initial request would
> be allowed (possibly making K negative) and only subsequent requests
> (before K became positive) would receive the QUOTA_VIOLATED. That would
> mean it was still possible to block the controller from handling other
> events – you just need to do so via making one big request.
>
> While the reasons for rejecting execution throttling make sense given the
> RPCs we have today that seems to be at the cost of still allowing harm to
> the cluster, or did I misunderstand?
>
> Kind regards,
>
> Tom
>
>
>
> On Tue, Apr 28, 2020 at 1:49 AM Jun Rao  wrote:
>
> > Hi, David,
> >
> > Thanks for the reply. A few more 

[jira] [Created] (KAFKA-9839) IllegalStateException on metadata update when broker learns about its new epoch after the controller

2020-04-08 Thread Anna Povzner (Jira)
Anna Povzner created KAFKA-9839:
---

 Summary: IllegalStateException on metadata update when broker 
learns about its new epoch after the controller
 Key: KAFKA-9839
 URL: https://issues.apache.org/jira/browse/KAFKA-9839
 Project: Kafka
  Issue Type: Bug
  Components: controller, core
Affects Versions: 2.3.1
Reporter: Anna Povzner


Broker throws "java.lang.IllegalStateException: Epoch XXX larger than current 
broker epoch YYY"  on UPDATE_METADATA when the controller learns about the 
broker epoch and sends UPDATE_METADATA before KafkaZkCLient.registerBroker 
completes (the broker learns about its new epoch).

Here is the scenario we observed in more detail:
1. ZK session expires on broker 1
2. Broker 1 establishes new session to ZK and creates znode
3. Controller learns about broker 1 and assigns epoch
4. Broker 1 receives UPDATE_METADATA from controller, but it does not know 
about its new epoch yet, so we get an exception:

ERROR [KafkaApi-3] Error when handling request: clientId=1, correlationId=0, 
api=UPDATE_METADATA, body={
.
java.lang.IllegalStateException: Epoch XXX larger than current broker epoch YYY 
at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2725) at 
kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:320) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:139) at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
java.lang.Thread.run(Thread.java:748)

5. KafkaZkCLient.registerBroker completes on broker 1: "INFO Stat of the 
created znode at /brokers/ids/1"

The result is the broker has a stale metadata for some time.

Possible solutions:
1. Broker returns a more specific error and controller retries UPDATE_MEDATA
2. Broker accepts UPDATE_METADATA with larger broker epoch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9677) Low consume bandwidth quota may cause consumer not being able to fetch data

2020-03-06 Thread Anna Povzner (Jira)
Anna Povzner created KAFKA-9677:
---

 Summary: Low consume bandwidth quota may cause consumer not being 
able to fetch data
 Key: KAFKA-9677
 URL: https://issues.apache.org/jira/browse/KAFKA-9677
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.3.1, 2.4.0, 2.2.2, 2.1.1, 2.0.1
Reporter: Anna Povzner
Assignee: Anna Povzner


When we changed quota communication with KIP-219, fetch requests get throttled 
by returning empty response with the delay in `throttle_time_ms` and Kafka 
consumer retrying again after the delay. 

With default configs, the maximum fetch size could be as big as 50MB (or 10MB 
per partition). The default broker config (1-second window, 10 full windows of 
tracked bandwidth/thread utilization usage) means that < 5MB/s consumer quota 
(per broker) may stop fetch request from ever being successful.

Or the other way around: 1 MB/s consumer quota (per broker) means that any 
fetch request that gets >= 10MB of data (10 seconds * 1MB/second) in the 
response will never get through.
h3. Proposed fix

Return less data in fetch response in this case: Cap `fetchMaxBytes` passed to 
replicaManager.fetchMessages() from KafkaApis.handleFetchRequest() to  * . In the example of default configs and 
1MB/s consumer bandwidth quota, fetchMaxBytes will be 10MB.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9658) Removing default user quota doesn't take effect until broker restart

2020-03-04 Thread Anna Povzner (Jira)
Anna Povzner created KAFKA-9658:
---

 Summary: Removing default user quota doesn't take effect until 
broker restart
 Key: KAFKA-9658
 URL: https://issues.apache.org/jira/browse/KAFKA-9658
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.3.1, 2.4.0, 2.2.2, 2.1.1, 2.0.1
Reporter: Anna Povzner
Assignee: Anna Povzner


To reproduce (for any quota type: produce, consume, and request):

Example with consumer quota, assuming no user/client quotas are set initially.
1. Set default user consumer quotas:

{{./kafka-configs.sh --zookeeper  --alter --add-config 
'consumer_byte_rate=1' --entity-type users --entity-default}}

{{2. Send some consume load for some user, say user1.}}

{{3. Remove default user consumer quota using:}}
{{./kafka-configs.sh --zookeeper  --alter --delete-config 
'consumer_byte_rate' --entity-type users --entity-default}}

Result: --describe (as below) returns correct result that there is no quota, 
but quota bound in ClientQuotaManager.metrics does not get updated for users 
that were sending load, which causes the broker to continue throttling requests 
with the previously set quota.
 {{/opt/confluent/bin/kafka-configs.sh --zookeeper   --describe 
--entity-type users --entity-default}}
{{}}{{}} 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-546: Add quota-specific APIs to the Admin Client, redux

2020-01-24 Thread Anna Povzner
Hi Brian,

The KIP looks good!

I have one clarification question regarding the distinction between
describe and resolve API. Suppose I set request quota for
/config/users/”user-1”/clients/"client-1" to 100 and request quota for
/config/users/”user-1” to 200. Is this correct that describeClientQuotas
called with /config/users/”user-1” would return two entries in the response?

   -

   /config/users/”user-1”/clients/, request quota type, value =
   100


   -

   /config/users/”user-1”, request quota type, value = 200


While resolve API for entity "/config/users/”user-1” would return the quota
setting specifically for /config/users/”user-1”, which is 200 in this case.

Is my understanding correct?

Thanks,

Anna


On Fri, Jan 24, 2020 at 11:32 AM Brian Byrne  wrote:

> My apologies, Rajini. My hasty edits omitted a couple spots. I've done a
> more thorough scan and should have cleaned up (1) and (2).
>
> For (3), Longs are chosen because that's what the ConfigCommand currently
> uses, and because there's no floating point type in the RPC protocol. Longs
> didn't seem to be an issue for bytes-per-second values, and
> request_percentage is normalized [0-100], but you're right in that the
> extensions might require this.
>
> To make Double compatible with the RPC protocol, we'd need to serialize the
> value into a String, and then validate the value on the receiving end.
> Would that be acceptable?
>
> Thanks,
> Brian
>
> On Fri, Jan 24, 2020 at 11:07 AM Rajini Sivaram 
> wrote:
>
> > Thanks Brian. Looks good.
> >
> > Just a few minor points:
> >
> > 1) We can remove *public ResolveClientQuotasOptions
> > setOmitOverriddenValues(boolean omitOverriddenValues); *
> > 2) Under ClientQuotasCommand, the three items are List/Describe/Alter,
> > rename to match the new naming for operations?
> > 3) Request quota configs are doubles rather than long. And for
> > ClientQuotaCallback API, we used doubles everywhere. Wasn't sure if we
> > deliberately chose Longs for this API. if so, we should mention why under
> > Rejected Alternatives. We actually use request quotas < 1 in integration
> > tests to ensure we can throttle easily.
> >
> >
> >
> > On Fri, Jan 24, 2020 at 5:28 PM Brian Byrne  wrote:
> >
> > > Thanks again, Rajini,
> > >
> > > Units will have to be implemented on a per-config basis, then. I've
> > removed
> > > all language reference to units and replaced QuotaKey -> String (config
> > > name). I've also renamed DescribeEffective -> Resolve, and replaced
> > --list
> > > with --describe, and --describe to --resolve to be consistent with the
> > > config command and clear about what functionality is "new".
> > >
> > > Thanks,
> > > Brian
> > >
> > > On Fri, Jan 24, 2020 at 2:27 AM Rajini Sivaram <
> rajinisiva...@gmail.com>
> > > wrote:
> > >
> > > > Hi Brian,
> > > >
> > > > Thanks for the responses.
> > > >
> > > > 4) Yes, agree that it would be simpler to leave units out of the
> > initial
> > > > design. We currently have units that are interpreted by the
> > configurable
> > > > callback. The default callback interprets the value as
> > > > per-broker-bytes-per-second and per-broker-percentage-cores. But
> > > callbacks
> > > > using partition-based throughput quotas for example would interpret
> the
> > > > value as cluster-wide-bytes-per-second. We could update callbacks to
> > work
> > > > with units, but as you said, it may be better to leave it out
> initially
> > > and
> > > > address later.
> > > >
> > > >
> > > >
> > > > On Thu, Jan 23, 2020 at 6:29 PM Brian Byrne 
> > wrote:
> > > >
> > > > > Thanks Rajini,
> > > > >
> > > > > 1) Good catch, fixed.
> > > > >
> > > > > 2) You're right. We'd need to extend ClientQuotaCallback#quotaLimit
> > or
> > > > add
> > > > > an alternate function. For the sake of an initial implementation,
> I'm
> > > > going
> > > > > to remove '--show-overridden', and a subsequent KIP will have to
> > > propose
> > > > an
> > > > > extents to ClientQuotaCallback to return more detailed information.
> > > > >
> > > > > 3) You're correct. I've removed the default.
> > > > >
> > > > > 4) The idea of the first iteration is be compatible with the
> existing
> > > > API,
> > > > > so no modification to start. The APIs should be kept consistent.
> If a
> > > > user
> > > > > wants to add custom functionality, say an entity type, they'll need
> > to
> > > > > update their ConfigEntityType any way, and the quotas APIs are
> meant
> > to
> > > > > handle that gracefully by accepting a String which can be
> propagated.
> > > > >
> > > > > The catch is 'units'. Part of the reason for having a default unit
> > was
> > > > for
> > > > > backwards compatibility, but maybe it's best to leave units out of
> > the
> > > > > initial design. This might lead to adding more configuration
> entries,
> > > but
> > > > > it's also the most flexible option. Thoughts?
> > > > >
> > > > > Thanks,
> > > > > Brian
> > > > >
> > > > >
> > > > > On Thu, Jan 23, 2020 at 4:57 AM Rajini Sivaram <
> > > 

[jira] [Reopened] (KAFKA-8800) Flaky Test SaslScramSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-08-26 Thread Anna Povzner (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner reopened KAFKA-8800:
-
  Assignee: Anastasia Vela  (was: Lee Dongjin)

> Flaky Test 
> SaslScramSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> --
>
> Key: KAFKA-8800
> URL: https://issues.apache.org/jira/browse/KAFKA-8800
> Project: Kafka
>  Issue Type: Bug
>  Components: core, security, unit tests
>Affects Versions: 2.4.0
>Reporter: Matthias J. Sax
>Assignee: Anastasia Vela
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.4.0, 2.3.1
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6956/testReport/junit/kafka.api/SaslScramSslEndToEndAuthorizationTest/testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl/]
> {quote}org.scalatest.exceptions.TestFailedException: Consumed 0 records 
> before timeout instead of the expected 1 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:822) at 
> kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:781) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1312) at 
> kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1320) at 
> kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:522)
>  at 
> kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:361){quote}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8837) KafkaMetricReporterClusterIdTest may not shutdown ZooKeeperTestHarness

2019-08-26 Thread Anna Povzner (Jira)
Anna Povzner created KAFKA-8837:
---

 Summary: KafkaMetricReporterClusterIdTest may not shutdown 
ZooKeeperTestHarness
 Key: KAFKA-8837
 URL: https://issues.apache.org/jira/browse/KAFKA-8837
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Anna Povzner
Assignee: Anastasia Vela


@After method in KafkaMetricReporterClusterIdTest calls  
`TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)` before it calls 
tearDown on ZooKeeperTestHarness (which shut downs ZK and zk client). If 
verifyNonDaemonThreadsStatus asserts, the rest of the resources will not get 
cleaned up.

We should move `TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)` 
to the end of `tearDown()`. However, would also be good to prevent people using 
this method in tear down similarly in the future. Maybe just adding a comment 
would help here.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8782) ReplicationQuotaManagerTest and ClientQuotaManagerTest should close Metrics object

2019-08-09 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-8782:
---

 Summary: ReplicationQuotaManagerTest and ClientQuotaManagerTest 
should close Metrics object
 Key: KAFKA-8782
 URL: https://issues.apache.org/jira/browse/KAFKA-8782
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.2.1
Reporter: Anna Povzner


ReplicationQuotaManagerTest and ClientQuotaManagerTest create Metrics objects 
in several tests, but do not close() them in the end. It would be good to 
cleanup resources in those tests, which also helps with reducing overall test 
flakiness.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8526) Broker may select a failed dir for new replica even in the presence of other live dirs

2019-06-11 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-8526:
---

 Summary: Broker may select a failed dir for new replica even in 
the presence of other live dirs
 Key: KAFKA-8526
 URL: https://issues.apache.org/jira/browse/KAFKA-8526
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.2.1, 2.1.1, 2.0.1, 1.1.1, 2.3.0
Reporter: Anna Povzner


Suppose a broker is configured with multiple log dirs. One of the log dirs 
fails, but there is no load on that dir, so the broker does not know about the 
failure yet, _i.e._, the failed dir is still in LogManager#_liveLogDirs. 
Suppose a new topic gets created, and the controller chooses the broker with 
failed log dir to host one of the replicas. The broker gets LeaderAndIsr 
request with isNew flag set. LogManager#getOrCreateLog() selects a log dir for 
the new replica from _liveLogDirs, then one two things can happen:
1) getAbsolutePath can fail, in which case getOrCreateLog will throw an 
IOException
2) Creating directory for new the replica log may fail (_e.g._, if directory 
becomes read-only, so getAbsolutePath worked). 

In both cases, the selected dir will be marked offline (which is correct). 
However, LeaderAndIsr will return an error and replica will be marked offline, 
even though the broker may have other live dirs. 

*Proposed solution*: Broker should retry selecting a dir for the new replica, 
if initially selected dir threw an IOException when trying to create a 
directory for the new replica. We should be able to do that in 
LogManager#getOrCreateLog() method, but keep in mind that 
logDirFailureChannel.maybeAddOfflineLogDir does not synchronously removes the 
dir from _liveLogDirs. So, it makes sense to select initial dir by calling 
LogManager#nextLogDir (current implementation), but if we fail to create log on 
that dir, one approach is to select next dir from _liveLogDirs in round-robin 
fashion (until we get to initial log dir – the case where all dirs failed).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8481) Clients may fetch incomplete set of topic partitions just after topic is created

2019-06-04 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-8481:
---

 Summary: Clients may fetch incomplete set of topic partitions just 
after topic is created
 Key: KAFKA-8481
 URL: https://issues.apache.org/jira/browse/KAFKA-8481
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.2.1
Reporter: Anna Povzner


KafkaConsumer#partitionsFor() or AdminClient#describeTopics() may return 
incomplete set of partitions for the given topic if the topic just got created.

Cause: When topic gets created, in most cases, controller sends partitions of 
this topics via several UpdateMetadataRequests (vs. one UpdateMetadataRequest 
with all partitions). First UpdateMetadataRequest contains partitions for which 
this broker hosts replicas, and then one or more UpdateMetadataRequest for the 
remaining partitions. This means that if a broker gets topic metadata requests 
between first and last UpdateMetadataRequest, the response will contain only 
subset of topic partitions.

Proposed fix: In KafkaController#processTopicChange(), before calling 
OnNewPartitionCreation(), send UpdateRequestMetadata with partitions of new 
topics (addedPartitionReplicaAssignment) to all live brokers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8480) Clients may fetch incomplete set of topic partitions during cluster startup

2019-06-04 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-8480:
---

 Summary: Clients may fetch incomplete set of topic partitions 
during cluster startup
 Key: KAFKA-8480
 URL: https://issues.apache.org/jira/browse/KAFKA-8480
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.2.1
Reporter: Anna Povzner
Assignee: Anna Povzner


KafkaConsumer#partitionsFor() or AdminClient#describeTopics() may return not 
all partitions for a given topic when the cluster is starting up (after cluster 
was down). 

The cause is controller, on becoming a controller, sending 
UpdateMetadataRequest for all partitions with at least one online replica, and 
then a separate UpdateMetadataRequest for all partitions with at least one 
offline replica. If client sends metadata request in between broker processing 
those two update metadata requests, clients will get incomplete set of 
partitions.

Proposed fix: controller should send one UpdateMetadataRequest (containing all 
partitions) in  ReplicaStateMachine#startup().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8002) Replica reassignment to new log dir may not complete if future and current replicas segment files have different base offsets

2019-02-25 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-8002:
---

 Summary: Replica reassignment to new log dir may not complete if 
future and current replicas segment files have different base offsets
 Key: KAFKA-8002
 URL: https://issues.apache.org/jira/browse/KAFKA-8002
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.1.1
Reporter: Anna Povzner


Once future replica fetches log end offset, the intended logic is to finish the 
move (and rename the future dir to current replica dir, etc). However, the 
check in Partition.maybeReplaceCurrentWithFutureReplica compares  the whole 
LogOffsetMetadata vs. log end offset. The resulting behavior is that the 
re-assignment will not finish for topic partitions that were cleaned/ compacted 
such that base offset of the last segment is different for the current and 
future replica. 

The proposed fix is to compare only log end offsets of the current and future 
replica.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8001) Fetch from future replica stalls when local replica becomes a leader

2019-02-25 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-8001:
---

 Summary: Fetch from future replica stalls when local replica 
becomes a leader
 Key: KAFKA-8001
 URL: https://issues.apache.org/jira/browse/KAFKA-8001
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.1.1
Reporter: Anna Povzner


With KIP-320, fetch from follower / future replica returns FENCED_LEADER_EPOCH 
if current leader epoch in the request is lower than the leader epoch known to 
the leader (or local replica in case of future replica fetching). In case of 
future replica fetching from the local replica, if local replica becomes the 
leader of the partition, the next fetch from future replica fails with 
FENCED_LEADER_EPOCH and fetching from future replica is stopped until the next 
leader change. 

Proposed solution: on local replica leader change, future replica should 
"become a follower" again, and go through the truncation phase. Or we could 
optimize it, and just update partition state of the future replica to reflect 
the updated current leader epoch. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7923) Add unit test to verify Kafka-7401 in AK versions >= 2.0

2019-02-12 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-7923:
---

 Summary: Add unit test to verify Kafka-7401 in AK versions >= 2.0
 Key: KAFKA-7923
 URL: https://issues.apache.org/jira/browse/KAFKA-7923
 Project: Kafka
  Issue Type: Test
Affects Versions: 2.1.0, 2.0.1
Reporter: Anna Povzner
Assignee: Anna Povzner


Kafka-7401 affected versions 1.0 and 1.1, which was fixed and the unit test was 
added. Versions 2.0 did not have that bug, because it was fixed as part of 
another change. To make sure we don't regress, we need to add a similar unit 
test that was added as part of Kafka-7401.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7786) Fast update of leader epoch may stall partition fetching due to FENCED_LEADER_EPOCH

2019-01-03 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-7786:
---

 Summary: Fast update of leader epoch may stall partition fetching 
due to FENCED_LEADER_EPOCH
 Key: KAFKA-7786
 URL: https://issues.apache.org/jira/browse/KAFKA-7786
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.1.0
Reporter: Anna Povzner


KIP-320/KAFKA-7395 Added FENCED_LEADER_EPOCH error response to a 
OffsetsForLeaderEpoch request if the epoch in the request is lower than the 
broker's leader epoch. ReplicaFetcherThread builds a OffsetsForLeaderEpoch 
request under _partitionMapLock_, sends the request outside the lock, and then 
processes the response under _partitionMapLock_. The broker may receive 
LeaderAndIsr with the same leader but with the next leader epoch, remove and 
add partition to the fetcher thread (with partition state reflecting the 
updated leader epoch) – all while the OffsetsForLeaderEpoch request (with the 
old leader epoch) is still outstanding/ waiting for the lock to process the 
OffsetsForLeaderEpoch response. As a result, partition gets removed from 
partitionStates and this broker will not fetch for this partition until the 
next LeaderAndIsr which may take a while. We will see log message like this:

[2018-12-23 07:23:04,802] INFO [ReplicaFetcher replicaId=3, leaderId=2, 
fetcherId=0] Partition test_topic-17 has an older epoch (7) than the current 
leader. Will await the new LeaderAndIsr state before resuming fetching. 
(kafka.server.ReplicaFetcherThread)

We saw this happen with 
kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=True.reassign_from_offset_zero=True.
 This test does partition re-assignment while bouncing 2 out of 4 total 
brokers. When the failure happen, each bounced broker was also a controller. 
Because of re-assignment, the controller updates leader epoch without updating 
the leader on controller change or on broker startup, so we see several leader 
epoch changes without the leader change, which increases the likelihood of the 
race condition described above.

Here is exact events that happen in this test (around the failure):

We have 4 brokers Brokers 1, 2, 3, 4. Partition re-assignment is started for 
test_topic-17 [2, 4, 1]  —> [3, 1, 2]. At time t0, leader of test_topic-17 is 
broker 2.
 # clean shutdown of broker 3, which is also a controller
 # broker 4 becomes controller, continues re-assignment and updates leader 
epoch for test_topic-17 to 6 (with same leader)
 # broker 2 (leader of test_topic-17) receives new leader epoch: “test_topic-17 
starts at Leader Epoch 6 from offset 1388. Previous Leader Epoch was: 5”
 # broker 3 is started again after clean shutdown
 # controller sees broker 3 startup, and sends LeaderAndIsr(leader epoch 6) to 
broker 3
 # controller updates leader epoch to 7
 # broker 2 (leader of test_topic-17) receives LeaderAndIsr for leader epoch 7: 
“test_topic-17 starts at Leader Epoch 7 from offset 1974. Previous Leader Epoch 
was: 6”
 # broker 3 receives LeaderAndIsr for test_topic-17 and leader epoch 6 from 
controller: “Added fetcher to broker BrokerEndPoint(id=2) for leader epoch 6” 
and sends OffsetsForLeaderEpoch request to broker 2
 # broker 3 receives LeaderAndIsr for test_topic-17 and leader epoch 7 from 
controller; removes fetcher thread and adds fetcher thread + executes 
AbstractFetcherThread.addPartitions() which updates partition state with leader 
epoch 7
 # broker 3 receives FENCED_LEADER_EPOCH in response to 
OffsetsForLeaderEpoch(leader epoch 6), because the leader received LeaderAndIsr 
for leader epoch 7 before it got OffsetsForLeaderEpoch(leader epoch 6) from 
broker 3. As a result, it removes partition from partitionStates and it does 
not fetch until controller updates leader epoch and sends LeaderAndIsr for this 
partition to broker 3. The test fails, because re-assignment does not finish on 
time (due to broker 3 not fetching).

 

One way to address this is possibly add more state to PartitionFetchState. 
However, we may introduce other race condition. A cleaner way, I think, is to 
return leader epoch in the OffsetsForLeaderEpoch response with 
FENCED_LEADER_EPOCH error, and then ignore the error if partition state 
contains a higher leader epoch. The advantage is less state maintenance, but 
disadvantage is it requires bumping inter-broker protocol.
h1.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7415) OffsetsForLeaderEpoch may incorrectly respond with undefined epoch causing truncation to HW

2018-09-14 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-7415:
---

 Summary: OffsetsForLeaderEpoch may incorrectly respond with 
undefined epoch causing truncation to HW
 Key: KAFKA-7415
 URL: https://issues.apache.org/jira/browse/KAFKA-7415
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 2.0.0
Reporter: Anna Povzner


If the follower's last appended epoch is ahead of the leader's last appended 
epoch, the OffsetsForLeaderEpoch response will incorrectly send 
(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), and the follower will truncate to 
HW. This may lead to data loss in some rare cases where 2 back-to-back leader 
elections happen (failure of one leader, followed by quick re-election of the 
next leader due to preferred leader election, so that all replicas are still in 
the ISR, and then failure of the 3rd leader).

The bug is in LeaderEpochFileCache.endOffsetFor(), which returns 
(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET) if the requested leader epoch is 
ahead of the last leader epoch in the cache. The method should return (last 
leader epoch in the cache, LEO) in this scenario.

 

Here is an example of a scenario where the issue leads to the data loss.

Suppose we have three replicas: r1, r2, and r3. Initially, the ISR consists of 
(r1, r2, r3) and the leader is r1. The data up to offset 10 has been committed 
to the ISR. Here is the initial state:
{code:java}
Leader: r1
leader epoch: 0
ISR(r1, r2, r3)
r1: [hw=10, leo=10]
r2: [hw=8, leo=10]
r3: [hw=5, leo=10]
{code}
Replica 1 fails and leaves the ISR, which makes Replica 2 the new leader with 
leader epoch = 1. The leader appends a batch, but it is not replicated yet to 
the followers.
{code:java}
Leader: r2
leader epoch: 1
ISR(r2, r3)
r1: [hw=10, leo=10]
r2: [hw=8, leo=11]
r3: [hw=5, leo=10]
{code}
Replica 3 is elected a leader (due to preferred leader election) before it has 
a chance to truncate, with leader epoch 2. 
{code:java}
Leader: r3
leader epoch: 2
ISR(r2, r3)
r1: [hw=10, leo=10]
r2: [hw=8, leo=11]
r3: [hw=5, leo=10]
{code}
Replica 2 sends OffsetsForLeaderEpoch(leader epoch = 1) to Replica 3. Replica 3 
incorrectly replies with UNDEFINED_EPOCH_OFFSET, and Replica 2 truncates to HW. 
If Replica 3 fails before Replica 2 re-fetches the data, this may lead to data 
loss.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-320: Allow fetchers to detect and handle log truncation

2018-08-13 Thread Anna Povzner
+1

Thanks for the KIP!

On Thu, Aug 9, 2018 at 5:16 PM Jun Rao  wrote:

> Hi, Jason,
>
> Thanks for the KIP. +1 from me.
>
> Jun
>
> On Wed, Aug 8, 2018 at 1:04 PM, Jason Gustafson 
> wrote:
>
> > Hi All,
> >
> > I'd like to start a vote for KIP-320:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation.
> > Thanks to everyone who reviewed the proposal. Please feel free to send
> > additional questions to the discussion thread if you have any.
> >
> > +1 from me (duh)
> >
> > Thanks,
> > Jason
> >
>


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-26 Thread Anna Povzner
Hi Jason,

Thanks for the update. I agree with the current proposal.

Two minor comments:
1) In “API Changes” section, first paragraph says that “users can catch the
more specific exception type and use the new `seekToNearest()` API defined
below.”. Since LogTruncationException “will include the partitions that
were truncated and the offset of divergence”., shouldn’t the client use
seek(offset) to seek to the offset of divergence in response to the
exception?
2) In “Protocol Changes” section, OffsetsForLeaderEpoch subsection says “Note
that consumers will send a sentinel value (-1) for the current epoch and
the broker will simply disregard that validation.”. Is that still true with
MetadataResponse containing leader epoch?

Thanks,
Anna

On Wed, Jul 25, 2018 at 1:44 PM Jason Gustafson  wrote:

> Hi All,
>
> I have made some updates to the KIP. As many of you know, a side project of
> mine has been specifying the Kafka replication protocol in TLA. You can
> check out the code here if you are interested:
> https://github.com/hachikuji/kafka-specification. In addition to
> uncovering
> a couple unknown bugs in the replication protocol (e.g.
> https://issues.apache.org/jira/browse/KAFKA-7128), this has helped me
> validate the behavior in this KIP. In fact, the original version I proposed
> had a weakness. I initially suggested letting the leader validate the
> expected epoch at the fetch offset. This made sense for the consumer in the
> handling of unclean leader election, but it was not strong enough to
> protect the follower in all cases. In order to make advancement of the high
> watermark safe, for example, the leader actually needs to be sure that
> every follower in the ISR matches its own epoch.
>
> I attempted to fix this problem by treating the epoch in the fetch request
> slightly differently for consumers and followers. For consumers, it would
> be the expected epoch of the record at the fetch offset, and the leader
> would raise a LOG_TRUNCATION error if the expectation failed. For
> followers, it would be the current epoch and the leader would require that
> it match its own epoch. This was unsatisfying both because of the
> inconsistency in behavior and because the consumer was left with the weaker
> fencing that we already knew was insufficient for the replicas. Ultimately
> I decided that we should make the behavior consistent and that meant that
> the consumer needed to act more like a following replica. Instead of
> checking for truncation while fetching, the consumer should check for
> truncation after leader changes. After checking for truncation, the
> consumer can then use the current epoch when fetching and get the stronger
> protection that it provides. What this means is that the Metadata API must
> include the current leader epoch. Given the problems we have had around
> stale metadata and how challenging they have been to debug, I'm convinced
> that this is a good idea in any case and it resolves the inconsistent
> behavior in the Fetch API. The downside is that there will be some
> additional overhead upon leader changes, but I don't think it is a major
> concern since leader changes are rare and the OffsetForLeaderEpoch request
> is cheap.
>
> This approach leaves the door open for some interesting follow up
> improvements. For example, now that we have the leader epoch in the
> Metadata request, we can implement similar fencing for the Produce API. And
> now that the consumer can reason about truncation, we could consider having
> a configuration to expose records beyond the high watermark. This would let
> users trade lower end-to-end latency for weaker durability semantics. It is
> sort of like having an acks=0 option for the consumer. Neither of these
> options are included in this KIP, I am just mentioning them as potential
> work for the future.
>
> Finally, based on the discussion in this thread, I have added the
> seekToCommitted API for the consumer. Please take a look and let me know
> what you think.
>
> Thanks,
> Jason
>
> On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang  wrote:
>
> > Hi Jason,
> >
> > The proposed API seems reasonable to me too. Could you please also update
> > the wiki page (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
> > with a section say "workflow" on how the proposed API will be co-used
> with
> > others to:
> >
> > 1. consumer callers handling a LogTruncationException.
> > 2. consumer internals for handling a retriable
> UnknownLeaderEpochException.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner 
> wrote:
> >
> > > Hi Jason,
>

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-17 Thread Anna Povzner
Hi Jason,


I also like your proposal and agree that KafkaConsumer#seekToCommitted() is
more intuitive as a way to initialize both consumer's position and its
fetch state.


My understanding that KafkaConsumer#seekToCommitted() is purely for clients
who store their offsets externally, right? And we are still going to
add KafkaConsumer#findOffsets()
in this KIP as we discussed, so that the client can handle
LogTruncationException?


Thanks,

Anna


On Thu, Jul 12, 2018 at 3:57 PM Dong Lin  wrote:

> Hey Jason,
>
> It is a great summary. The solution sounds good. I might have minor
> comments regarding the method name. But we can discuss that minor points
> later after we reach consensus on the high level API.
>
> Thanks,
> Dong
>
>
> On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson 
> wrote:
>
> > Hey Anna and Dong,
> >
> > Thanks a lot for the great discussion. I've been hanging back a bit
> because
> > honestly the best option hasn't seemed clear. I agree with Anna's general
> > observation that there is a distinction between the position of the
> > consumer and its fetch state up to that position. If you think about it,
> a
> > committed offset actually represents both of these. The metadata is used
> to
> > initialize the state of the consumer application and the offset
> initializes
> > the position. Additionally, we are extending the offset commit in this
> KIP
> > to also include the last epoch fetched by the consumer, which is used to
> > initialize the internal fetch state. Of course if you do an arbitrary
> > `seek` and immediately commit offsets, then there won't be a last epoch
> to
> > commit. This seems intuitive since there is no fetch state in this case.
> We
> > only commit fetch state when we have it.
> >
> > So if we think about a committed offset as initializing both the
> consumer's
> > position and its fetch state, then the gap in the API is evidently that
> we
> > don't have a way to initialize the consumer to a committed offset. We do
> it
> > implicitly of course for offsets stored in Kafka, but since external
> > storage is a use case we support, then we should have an explicit API as
> > well. Perhaps something like this:
> >
> > seekToCommitted(TopicPartition, OffsetAndMetadata)
> >
> > In this KIP, we are proposing to allow the `OffsetAndMetadata` object to
> > include the leader epoch, so I think this would have the same effect as
> > Anna's suggested `seekToRecord`. But perhaps it is a more natural fit
> given
> > the current API? Furthermore, if we find a need for additional metadata
> in
> > the offset commit API in the future, then we will just need to modify the
> > `OffsetAndMetadata` object and we will not need a new `seek` API.
> >
> > With this approach, I think then we can leave the `position` API as it
> is.
> > The position of the consumer is still just the next expected fetch
> offset.
> > If a user needs to record additional state based on previous fetch
> > progress, then they would use the result of the previous fetch to obtain
> > it. This makes the dependence on fetch progress explicit. I think we
> could
> > make this a little more convenience with a helper in the
> `ConsumerRecords`
> > object, but I think that's more of a nice-to-have.
> >
> > Thoughts?
> >
> > By the way, I have been iterating a little bit on the replica side of
> this
> > KIP. My initial proposal in fact did not have strong enough fencing to
> > protect all of the edge cases. I believe the current proposal fixes the
> > problems, but I am still verifying the model.
> >
> > Thanks,
> > Jason
> >
> >
> > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin  wrote:
> >
> > > Hey Anna,
> > >
> > > Thanks much for the explanation. Approach 1 also sounds good to me. I
> > think
> > > findOffsets() is useful for users who don't use automatic offset reset
> > > policy.
> > >
> > > Just one more question. Since users who store offsets externally need
> to
> > > provide leaderEpoch to findOffsets(...), do we need an extra API for
> user
> > > to get both offset and leaderEpoch, e.g. recordPosition()?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner 
> > wrote:
> > >
> > > > Hi Dong,
> > > >
> > > >
> > > > What I called “not covering all use cases” is what you call
> best-effort
> > > > (not guaranteeing some corner cases). I think we are on the same page
>

[jira] [Created] (KAFKA-7151) Broker running out of disk space may result in state where unclean leader election is required

2018-07-11 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-7151:
---

 Summary: Broker running out of disk space may result in state 
where unclean leader election is required
 Key: KAFKA-7151
 URL: https://issues.apache.org/jira/browse/KAFKA-7151
 Project: Kafka
  Issue Type: Bug
Reporter: Anna Povzner


We have seen situations like the following:

1) Broker A is a leader for topic partition, and brokers B and C are the 
followers

2) Broker A is running out of disk space, shrinks ISR only to itself, and then 
sometime later gets disk errors, etc.

3) Broker A is stopped, disk space is reclaimed, and broker A is restarted

Result: Broker A becomes a leader, but followers cannot fetch because their log 
is ahead. The only way to continue is to enable unclean leader election.

 

There are several issues here:

-- if the machine is running out of disk space, we do not reliably get an error 
from a file system as soon as that happens. The broker could be in a state 
where some writes succeed (possibly if the write is not flushed to disk) and 
some writes fails, or maybe fail later. This may cause fetchers fetch records 
that are still in the leader's file system cache, and then the flush to disk 
failing on the leader, causes followers to be ahead of the leader.

-- I am not sure exactly why, but it seems like the leader broker (that is 
running out of disk space) may also stop servicing fetch requests making 
followers fall behind and kicked out of ISR.

Ideally, the broker should stop being a leader for any topic partition before 
accepting any records that may fail to be flushed to disk. One option is to 
automatically detect disk space usage and make a broker read-only for topic 
partitions if disk space gets to 80% or something. Maybe there is a better 
option.  

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7150) Error in processing fetched data for one partition may stop follower fetching other partitions

2018-07-11 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-7150:
---

 Summary: Error in processing fetched data for one partition may 
stop follower fetching other partitions
 Key: KAFKA-7150
 URL: https://issues.apache.org/jira/browse/KAFKA-7150
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 1.1.0, 1.0.2, 0.11.0.3, 0.10.2.2
Reporter: Anna Povzner


If the followers fails to process data for one topic partitions, like out of 
order offsets error, the whole ReplicaFetcherThread is killed, which also stops 
fetching for other topic partitions serviced by this fetcher thread. This may 
result in un-necessary under-replicated partitions. I think it would be better 
to continue fetching for other topic partitions, and just remove the partition 
with an error from the responsibility of the fetcher thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-11 Thread Anna Povzner
Hi Dong,


What I called “not covering all use cases” is what you call best-effort
(not guaranteeing some corner cases). I think we are on the same page here.


I wanted to be clear in the API whether the consumer seeks to a position
(offset) or to a record (offset, leader epoch). The only use-case of
seeking to a record is seeking to a committed offset for a user who stores
committed offsets externally. (Unless users find some other reason to seek
to a record.) I thought it was possible to provide this functionality with
findOffset(offset, leader epoch) followed by a seek(offset). However, you
are right that this will not handle the race condition where non-divergent
offset found by findOffset() could change again before the consumer does
the first fetch.


Regarding position() — if we add position that returns (offset, leader
epoch), this is specifically a position after a record that was actually
consumed or position of a committed record. In which case, I still think
it’s cleaner to get a record position of consumed message from a new helper
method in ConsumerRecords() or from committed offsets.


I think all the use-cases could be then covered with:

(Approach 1)

seekToRecord(offset, leaderEpoch) — this will just initialize/set the
consumer state;

findOffsets(offset, leaderEpoch) returns {offset, leaderEpoch}


If we agree that the race condition is also a corner case, then I think we
can cover use-cases with:

(Approach 2)

findOffsets(offset, leaderEpoch) returns offset — we still want leader
epoch as a parameter for the users who store their committed offsets
externally.


I am actually now leaning more to approach 1, since it is more explicit,
and maybe there are more use cases for it.


Thanks,

Anna


On Tue, Jul 10, 2018 at 3:47 PM Dong Lin  wrote:

> Hey Anna,
>
> Thanks for the comment. To answer your question, it seems that we can cover
> all case in this KIP. As stated in "Consumer Handling" section, KIP-101
> based approach will be used to derive the truncation offset from the
> 2-tuple (offset, leaderEpoch). This approach is best effort and it is
> inaccurate only in very rare scenarios (as described in KIP-279).
>
> By using seek(offset, leaderEpoch), consumer will still be able to follow
> this best-effort approach to detect log truncation and determine the
> truncation offset. On the other hand, if we use seek(offset), consumer will
> not detect log truncation in some cases which weakens the guarantee of this
> KIP. Does this make sense?
>
> Thanks,
> Dong
>
> On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner  wrote:
>
> > Sorry, I hit "send" before finishing. Continuing...
> >
> >
> > 2) Hiding most of the consumer handling log truncation logic with minimal
> > exposure in KafkaConsumer API.  I was proposing this path.
> >
> >
> > Before answering your specific questions… I want to answer to your
> comment
> > “In general, maybe we should discuss the final solution that covers all
> > cases?”. With current KIP, we don’t cover all cases of consumer detecting
> > log truncation because the KIP proposes a leader epoch cache in consumer
> > that does not persist across restarts. Plus, we only store last committed
> > offset (either internally or users can store externally). This has a
> > limitation that the consumer will not always be able to find point of
> > truncation just because we have a limited history (just one data point).
> >
> >
> > So, maybe we should first agree on whether we accept that storing last
> > committed offset/leader epoch has a limitation that the consumer will not
> > be able to detect log truncation in all cases?
> >
> >
> > Thanks,
> >
> > Anna
> >
> > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner  wrote:
> >
> > > Hi Dong,
> > >
> > > Thanks for the follow up! I finally have much more clear understanding
> of
> > > where you are coming from.
> > >
> > > You are right. The success of findOffsets()/finding a point of
> > > non-divergence depends on whether we have enough entries in the
> > consumer's
> > > leader epoch cache. However, I think this is a fundamental limitation
> of
> > > having a leader epoch cache that does not persist across consumer
> > restarts.
> > >
> > > If we consider the general case where consumer may or may not have this
> > > cache, then I see two paths:
> > > 1) Letting the user to track the leader epoch history externally, and
> > have
> > > more exposure to leader epoch and finding point of non-divergence in
> > > KafkaConsumer API. I understand this is the case you were talking
> about.
> > >
> > >
> &g

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-10 Thread Anna Povzner
Sorry, I hit "send" before finishing. Continuing...


2) Hiding most of the consumer handling log truncation logic with minimal
exposure in KafkaConsumer API.  I was proposing this path.


Before answering your specific questions… I want to answer to your comment
“In general, maybe we should discuss the final solution that covers all
cases?”. With current KIP, we don’t cover all cases of consumer detecting
log truncation because the KIP proposes a leader epoch cache in consumer
that does not persist across restarts. Plus, we only store last committed
offset (either internally or users can store externally). This has a
limitation that the consumer will not always be able to find point of
truncation just because we have a limited history (just one data point).


So, maybe we should first agree on whether we accept that storing last
committed offset/leader epoch has a limitation that the consumer will not
be able to detect log truncation in all cases?


Thanks,

Anna

On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner  wrote:

> Hi Dong,
>
> Thanks for the follow up! I finally have much more clear understanding of
> where you are coming from.
>
> You are right. The success of findOffsets()/finding a point of
> non-divergence depends on whether we have enough entries in the consumer's
> leader epoch cache. However, I think this is a fundamental limitation of
> having a leader epoch cache that does not persist across consumer restarts.
>
> If we consider the general case where consumer may or may not have this
> cache, then I see two paths:
> 1) Letting the user to track the leader epoch history externally, and have
> more exposure to leader epoch and finding point of non-divergence in
> KafkaConsumer API. I understand this is the case you were talking about.
>
>
>
> On Tue, Jul 10, 2018 at 12:16 PM Dong Lin  wrote:
>
>> Hey Anna,
>>
>> Thanks much for your detailed explanation and example! It does help me
>> understand the difference between our understanding.
>>
>> So it seems that the solution based on findOffsets() currently focuses
>> mainly on the scenario that consumer has cached leaderEpoch -> offset
>> mapping whereas I was thinking about the general case where consumer may
>> or
>> may not have this cache. I guess that is why we have different
>> understanding here. I have some comments below.
>>
>>
>> 3) The proposed solution using findOffsets(offset, leaderEpoch) followed
>> by
>> seek(offset) works if consumer has the cached leaderEpoch -> offset
>> mapping. But if we assume consumer has this cache, do we need to have
>> leaderEpoch in the findOffsets(...)? Intuitively, the findOffsets(offset)
>> can also derive the leaderEpoch using offset just like the proposed
>> solution does with seek(offset).
>>
>>
>> 4) If consumer does not have cached leaderEpoch -> offset mapping, which
>> is
>> the case if consumer is restarted on a new machine, then it is not clear
>> what leaderEpoch would be included in the FetchRequest if consumer does
>> seek(offset). This is the case that motivates the first question of the
>> previous email. In general, maybe we should discuss the final solution
>> that
>> covers all cases?
>>
>>
>> 5) The second question in my previous email is related to the following
>> paragraph:
>>
>> "... In some cases, offsets returned from position() could be actual
>> consumed messages by this consumer identified by {offset, leader epoch}.
>> In
>> other cases, position() returns offset that was not actually consumed.
>> Suppose, the user calls position() for the last offset...".
>>
>> I guess my point is that, if user calls position() for the last offset and
>> uses that offset in seek(...), then user can probably just call
>> Consumer#seekToEnd() without calling position() and seek(...). Similarly
>> user can call Consumer#seekToBeginning() to the seek to the earliest
>> position without calling position() and seek(...). Thus position() only
>> needs to return the actual consumed messages identified by {offset, leader
>> epoch}. Does this make sense?
>>
>>
>> Thanks,
>> Dong
>>
>>
>> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner  wrote:
>>
>> > Hi Dong,
>> >
>> >
>> > Thanks for considering my suggestions.
>> >
>> >
>> > Based on your comments, I realized that my suggestion was not complete
>> with
>> > regard to KafkaConsumer API vs. consumer-broker protocol. While I
>> propose
>> > to keep KafkaConsumer#seek() unchanged and take offset only, the
>> underlying
>> > 

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-10 Thread Anna Povzner
Hi Dong,

Thanks for the follow up! I finally have much more clear understanding of
where you are coming from.

You are right. The success of findOffsets()/finding a point of
non-divergence depends on whether we have enough entries in the consumer's
leader epoch cache. However, I think this is a fundamental limitation of
having a leader epoch cache that does not persist across consumer restarts.

If we consider the general case where consumer may or may not have this
cache, then I see two paths:
1) Letting the user to track the leader epoch history externally, and have
more exposure to leader epoch and finding point of non-divergence in
KafkaConsumer API. I understand this is the case you were talking about.



On Tue, Jul 10, 2018 at 12:16 PM Dong Lin  wrote:

> Hey Anna,
>
> Thanks much for your detailed explanation and example! It does help me
> understand the difference between our understanding.
>
> So it seems that the solution based on findOffsets() currently focuses
> mainly on the scenario that consumer has cached leaderEpoch -> offset
> mapping whereas I was thinking about the general case where consumer may or
> may not have this cache. I guess that is why we have different
> understanding here. I have some comments below.
>
>
> 3) The proposed solution using findOffsets(offset, leaderEpoch) followed by
> seek(offset) works if consumer has the cached leaderEpoch -> offset
> mapping. But if we assume consumer has this cache, do we need to have
> leaderEpoch in the findOffsets(...)? Intuitively, the findOffsets(offset)
> can also derive the leaderEpoch using offset just like the proposed
> solution does with seek(offset).
>
>
> 4) If consumer does not have cached leaderEpoch -> offset mapping, which is
> the case if consumer is restarted on a new machine, then it is not clear
> what leaderEpoch would be included in the FetchRequest if consumer does
> seek(offset). This is the case that motivates the first question of the
> previous email. In general, maybe we should discuss the final solution that
> covers all cases?
>
>
> 5) The second question in my previous email is related to the following
> paragraph:
>
> "... In some cases, offsets returned from position() could be actual
> consumed messages by this consumer identified by {offset, leader epoch}. In
> other cases, position() returns offset that was not actually consumed.
> Suppose, the user calls position() for the last offset...".
>
> I guess my point is that, if user calls position() for the last offset and
> uses that offset in seek(...), then user can probably just call
> Consumer#seekToEnd() without calling position() and seek(...). Similarly
> user can call Consumer#seekToBeginning() to the seek to the earliest
> position without calling position() and seek(...). Thus position() only
> needs to return the actual consumed messages identified by {offset, leader
> epoch}. Does this make sense?
>
>
> Thanks,
> Dong
>
>
> On Mon, Jul 9, 2018 at 6:47 PM, Anna Povzner  wrote:
>
> > Hi Dong,
> >
> >
> > Thanks for considering my suggestions.
> >
> >
> > Based on your comments, I realized that my suggestion was not complete
> with
> > regard to KafkaConsumer API vs. consumer-broker protocol. While I propose
> > to keep KafkaConsumer#seek() unchanged and take offset only, the
> underlying
> > consumer will send the next FetchRequest() to broker with offset and
> > leaderEpoch if it is known (based on leader epoch cache in consumer) —
> note
> > that this is different from the current KIP, which suggests to always
> send
> > unknown leader epoch after seek(). This way, if the consumer and a broker
> > agreed on the point of non-divergence, which is some {offset,
> leaderEpoch}
> > pair, the new leader which causes another truncation (even further back)
> > will be able to detect new divergence and restart the process of finding
> > the new point of non-divergence. So, to answer your question, If the
> > truncation happens just after the user calls
> > KafkaConsumer#findOffsets(offset, leaderEpoch) followed by seek(offset),
> > the user will not seek to the wrong position without knowing that
> > truncation has happened, because the consumer will get another truncation
> > error, and seek again.
> >
> >
> > I am afraid, I did not understand your second question. Let me summarize
> my
> > suggestions again, and then give an example to hopefully make my
> > suggestions more clear. Also, the last part of my example shows how the
> > use-case in your first question will work. If it does not answer your
> > second question, would you mind clarifying? I am also focusing on the
> case
> &g

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-09 Thread Anna Povzner
k(offset=1) followed by
poll(), which results in FetchRequest(offset=1, leaderEpoch=0) to broker A,
which responds with message at offset 1, leader epoch 3.


I will think some more about consumers restarting from committed offsets,
and send a follow up.


Thanks,

Anna


On Sat, Jul 7, 2018 at 1:36 AM Dong Lin  wrote:

> Hey Anna,
>
> Thanks much for the thoughtful reply. It makes sense to different between
> "seeking to a message" and "seeking to a position". I have to questions
> here:
>
> - For "seeking to a message" use-case, with the proposed approach user
> needs to call findOffset(offset, leaderEpoch) followed by seek(offset). If
> message truncation and message append happen immediately after
> findOffset(offset,
> leaderEpoch) but before seek(offset), it seems that user will seek to the
> wrong message without knowing the truncation has happened. Would this be a
> problem?
>
> - For "seeking to a position" use-case, it seems that there can be two
> positions, i.e. earliest and latest. So these two cases can be
> Consumer.fulfilled by seekToBeginning() and Consumer.seekToEnd(). Then it
> seems that user will only need to call position() and seek() for "seeking
> to a message" use-case?
>
> Thanks,
> Dong
>
>
> On Wed, Jul 4, 2018 at 12:33 PM, Anna Povzner  wrote:
>
> > Hi Jason and Dong,
> >
> >
> > I’ve been thinking about your suggestions and discussion regarding
> > position(), seek(), and new proposed API.
> >
> >
> > Here is my thought process why we should keep position() and seek() API
> > unchanged.
> >
> >
> > I think we should separate {offset, leader epoch} that uniquely
> identifies
> > a message from an offset that is a position. In some cases, offsets
> > returned from position() could be actual consumed messages by this
> consumer
> > identified by {offset, leader epoch}. In other cases, position() returns
> > offset that was not actually consumed. Suppose, the user calls position()
> > for the last offset. Suppose we return {offset, leader epoch} of the
> > message currently in the log. Then, the message gets truncated before
> > consumer’s first poll(). It does not make sense for poll() to fail in
> this
> > case, because the log truncation did not actually happen from the
> consumer
> > perspective. On the other hand, as the KIP proposes, it makes sense for
> the
> > committed() method to return {offset, leader epoch} because those offsets
> > represent actual consumed messages.
> >
> >
> > The same argument applies to the seek() method — we are not seeking to a
> > message, we are seeking to a position.
> >
> >
> > I like the proposal to add KafkaConsumer#findOffsets() API. I am assuming
> > something like:
> >
> > Map findOffsets(Map
> > offsetsToSearch)
> >
> > Similar to seek() and position(), I think findOffsets() should return
> > offset without leader epoch, because what we want is the offset that we
> > think is closest to the not divergent message from the given consumed
> > message. Until the consumer actually fetches the message, we should not
> let
> > the consumer store the leader epoch for a message it did not consume.
> >
> >
> > So, the workflow will be:
> >
> > 1) The user gets LogTruncationException with {offset, leader epoch of the
> > previous message} (whatever we send with new FetchRecords request).
> >
> > 2) offset = findOffsets(tp -> {offset, leader epoch})
> >
> > 3) seek(offset)
> >
> >
> > For the use-case where the users store committed offsets externally:
> >
> > 1) Such users would have to track the leader epoch together with an
> offset.
> > Otherwise, there is no way to detect later what leader epoch was
> associated
> > with the message. I think it’s reasonable to ask that from users if they
> > want to detect log truncation. Otherwise, they will get the current
> > behavior.
> >
> >
> > If the users currently get an offset to be stored using position(), I see
> > two possibilities. First, they call save offset returned from position()
> > that they call before poll(). In that case, it would not be correct to
> > store {offset, leader epoch} if we would have changed position() to
> return
> > {offset, leader epoch} since actual fetched message could be different
> > (from the example I described earlier). So, it would be more correct to
> > call position() after poll(). However, the user already gets
> > ConsumerRecords at this point, from which the user can extract {offset,
> > leader e

Re: [DISCUSS] KIP-320: Allow fetchers to detect and handle log truncation

2018-07-04 Thread Anna Povzner
Hi Jason and Dong,


I’ve been thinking about your suggestions and discussion regarding
position(), seek(), and new proposed API.


Here is my thought process why we should keep position() and seek() API
unchanged.


I think we should separate {offset, leader epoch} that uniquely identifies
a message from an offset that is a position. In some cases, offsets
returned from position() could be actual consumed messages by this consumer
identified by {offset, leader epoch}. In other cases, position() returns
offset that was not actually consumed. Suppose, the user calls position()
for the last offset. Suppose we return {offset, leader epoch} of the
message currently in the log. Then, the message gets truncated before
consumer’s first poll(). It does not make sense for poll() to fail in this
case, because the log truncation did not actually happen from the consumer
perspective. On the other hand, as the KIP proposes, it makes sense for the
committed() method to return {offset, leader epoch} because those offsets
represent actual consumed messages.


The same argument applies to the seek() method — we are not seeking to a
message, we are seeking to a position.


I like the proposal to add KafkaConsumer#findOffsets() API. I am assuming
something like:

Map findOffsets(Map
offsetsToSearch)

Similar to seek() and position(), I think findOffsets() should return
offset without leader epoch, because what we want is the offset that we
think is closest to the not divergent message from the given consumed
message. Until the consumer actually fetches the message, we should not let
the consumer store the leader epoch for a message it did not consume.


So, the workflow will be:

1) The user gets LogTruncationException with {offset, leader epoch of the
previous message} (whatever we send with new FetchRecords request).

2) offset = findOffsets(tp -> {offset, leader epoch})

3) seek(offset)


For the use-case where the users store committed offsets externally:

1) Such users would have to track the leader epoch together with an offset.
Otherwise, there is no way to detect later what leader epoch was associated
with the message. I think it’s reasonable to ask that from users if they
want to detect log truncation. Otherwise, they will get the current
behavior.


If the users currently get an offset to be stored using position(), I see
two possibilities. First, they call save offset returned from position()
that they call before poll(). In that case, it would not be correct to
store {offset, leader epoch} if we would have changed position() to return
{offset, leader epoch} since actual fetched message could be different
(from the example I described earlier). So, it would be more correct to
call position() after poll(). However, the user already gets
ConsumerRecords at this point, from which the user can extract {offset,
leader epoch} of the last message.


So, I like the idea of adding a helper method to ConsumerRecords, as Jason
proposed, something like:

public OffsetAndEpoch lastOffsetWithLeaderEpoch(), where OffsetAndEpoch is
a data struct holding {offset, leader epoch}.


In this case, we would advise the user to follow the workflow: poll(), get
{offset, leader epoch} from ConsumerRecords#lastOffsetWithLeaderEpoch(),
save offset and leader epoch, process records.


2) When the user needs to seek to the last committed offset, they call new
findOffsets(saved offset, leader epoch), and then seek(offset).


What do you think?


Thanks,

Anna


On Tue, Jul 3, 2018 at 4:06 PM Dong Lin  wrote:

> Hey Jason,
>
> Thanks much for your thoughtful explanation.
>
> Yes the solution using findOffsets(offset, leaderEpoch) also works. The
> advantage of this solution it adds only one API instead of two APIs. The
> concern is that its usage seems a bit more clumsy for advanced users. More
> specifically, advanced users who store offsets externally will always need
> to call findOffsets() before calling seek(offset) during consumer
> initialization. And those advanced users will need to manually keep track
> of the leaderEpoch of the last ConsumerRecord.
>
> The other solution may be more user-friendly for advanced users is to add
> two APIs, `void seek(offset, leaderEpoch)` and `(offset, epoch) =
> offsetEpochs(topicPartition)`.
>
> I kind of prefer the second solution because it is easier to use for
> advanced users. If we need to expose leaderEpoch anyway to safely identify
> a message, it may be conceptually simpler to expose it directly in
> seek(...) rather than requiring one more translation using
> findOffsets(...). But I am also OK with the first solution if other
> developers also favor that one :)
>
> Thanks,
> Dong
>
>
> On Mon, Jul 2, 2018 at 11:10 AM, Jason Gustafson 
> wrote:
>
> > Hi Dong,
> >
> > Thanks, I've been thinking about your suggestions a bit. It is
> challenging
> > to make this work given the current APIs. One of the difficulties is that
> > we don't have an API to find the leader epoch for a given offset at the

[jira] [Created] (KAFKA-7104) ReplicaFetcher thread may die because of inconsistent log start offset in fetch response

2018-06-26 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-7104:
---

 Summary: ReplicaFetcher thread may die because of inconsistent log 
start offset in fetch response
 Key: KAFKA-7104
 URL: https://issues.apache.org/jira/browse/KAFKA-7104
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.0, 1.0.0
Reporter: Anna Povzner
Assignee: Anna Povzner


What we saw:

The follower fetches offset 116617, which it was able successfully append. 
However, leader's log start offset in fetch request was 116753, which was 
higher than fetched offset 116617. When replica fetcher thread tried to 
increment log start offset to leader's log start offset, it failed with 
OffsetOutOfRangeException: 

[2018-06-23 00:45:37,409] ERROR [ReplicaFetcher replicaId=1002, leaderId=1001, 
fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) 
 kafka.common.KafkaException: Error processing data for partition X-N offset 
116617 

Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
increment the log start offset to 116753 of partition X-N since it is larger 
than the high watermark 116619

 

In leader's log, we see that log start offset was incremented almost at the 
same time (within one 100 ms or so). 

[2018-06-23 00:45:37,339] INFO Incrementing log start offset of partition X-N 
to 116753 in dir /kafka/kafka-logs (kafka.log.Log)

 

In leader's logic: ReplicaManager#ReplicaManager first calls readFromLocalLog() 
that reads from local log and returns LogReadResult that contains fetched data 
and leader's log start offset and HW. However, it then calls 
ReplicaManager#updateFollowerLogReadResults() that may move leader's log start 
offset and update leader's log start offset and HW in fetch response. If 
deleteRecords() happens in between, it is possible that log start offset may 
move beyond fetched offset. As a result, fetch response will contain fetched 
data but log start offset that is beyond fetched offset (and indicate the state 
on leader that fetched data does not actually exist anymore on leader).

When a follower receives such fetch response, it will first append, then move 
it's HW no further than its LEO, which maybe less than leader's log start 
offset in fetch response, and then call 
`replica.maybeIncrementLogStartOffset(leaderLogStartOffset)` which will throw 
OffsetOutOfRangeException exception causing the fetcher thread to stop. 

 

*Suggested fix:*

If the leader moves log start offset beyond fetched offset, 
ReplicaManager#updateFollowerLogReadResults()  should update the log read 
result with OFFSET_OUT_OF_RANGE error, which will cause the follower to reset 
fetch offset to leader's log start offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-201: Rationalising Policy interfaces

2018-06-14 Thread Anna Povzner
Hi Tom,

Just wanted to check what you think about the comments I made in my last
message. I think this KIP is a big improvement to our current policy
interfaces, and really hope we can get this KIP in.

Thanks,
Anna

On Thu, May 31, 2018 at 3:29 PM, Anna Povzner  wrote:

> Hi Tom,
>
>
> Thanks for the KIP. I am aware that the voting thread was started, but
> wanted to discuss couple of concerns here first.
>
>
> I think the coupling of RequestedTopicState#generatedReplicaAssignment()
> and TopicState#replicasAssignments() does not work well in case where the
> request deals only with a subset of partitions (e.g., add partitions) or no
> assignment at all (alter topic config). In particular:
>
> 1) Alter topic config use case: There is no replica assignment in the
> request, and generatedReplicaAssignment()  returning either true or false
> is both misleading. The user can interpret this as assignment being
> generated or provided by the user originally (e.g., on topic create), while
> I don’t think we track such thing.
>
> 2) On add partitions, we may have manual assignment for new partitions.
> What I understood from the KIP,  generatedReplicaAssignment() will return
> true or false based on whether new partitions were manually assigned or
> not, while TopicState#replicasAssignments() will return replica
> assignments for all partitions. I think it is confusing in a way that
> assignment of old partitions could be auto-generated but new partitions are
> manually assigned.
>
> 3) Generalizing #2, suppose in a future, a user can re-assign replicas for
> a set of partitions.
>
>
> One way to address this with minimal changes to proposed API is to rename
> RequestedTopicState#generatedReplicaAssignment() to 
> RequestedTopicState#manualReplicaAssignment()
> and change the API behavior and description to : “True if the client
> explicitly provided replica assignments in this request, which means that
> some or all assignments returned by TopicState#replicasAssignments() are
> explicitly requested by the user”. The user then will have to diff
> TopicState#replicasAssignments() from clusterState and TopicState#
> replicasAssignments()  from RequestedTopicState, and assume that
> assignments that are different are manually assigned (if
> RequestedTopicState#manualReplicaAssignment()  returns true). We will
> need to clearly document this and it still seems awkward.
>
>
> I think a cleaner way is to make RequestedTopicState to provide replica
> assignments only for partitions that were manually assigned replicas in the
> request that is being validated. Similarly, for alter topic validation, it
> would be nice to make it more clear for the user what has been changed. I
> remember that you already raised that point earlier by comparing current
> proposed API with having separate methods for each specific command.
> However, I agree that it will make it harder to change the interface in the
> future.
>
>
> Could we explore the option of pushing methods that are currently in
> TopicState to CreateTopicRequest and AlterTopicRequest? TopicState will
> still be used for requesting current topic state via ClusterState.
>
> Something like:
>
> interface CreateTopicRequest extends AbstractRequestMetadata {
>
>   // requested number of partitions or if manual assignment is given,
> number of partitions in the assignment
>
>   int numPartitions();
>
>   // requested replication factor, or if manual assignment is given,
> number of replicas in assignment for partition 0
>
>   short replicationFactor();
>
>  // replica assignment requested by the client, or null if assignment is
> auto-generated
>
>  map> manualReplicaAssignment();
>
>  map configs();
>
> }
>
>
> interface AlterTopicRequest extends AbstractRequestMetadata {
>
>   // updated topic configs, or null if not changed
>
>   map updatedConfigs();
>
>   // proposed replica assignment in this request, or null. For adding new
> partitions request, this is proposed replica assignment for new partitions.
> For replica re-assignment case, this is proposed new assignment.
>
>   map> proposedReplicaAssignment();
>
>   // new number of partitions (due to increase/decrease), or null if
> number of partitions not changed
>
>   Integer updatedNumPartitions()
>
> }
>
>
> I did not spend much time on my AlterTopicRequest interface proposal, but
> the idea is basically to return only the parts which were changed. The
> advantage of this approach over having separate methods for each specific
> alter topic request is that it is more flexible for future mixing of what
> can be updated in the topic state.
>
>
> What do you think?
>
>
> Thank

Re: [DISCUSS] KIP-201: Rationalising Policy interfaces

2018-05-31 Thread Anna Povzner
Hi Tom,


Thanks for the KIP. I am aware that the voting thread was started, but
wanted to discuss couple of concerns here first.


I think the coupling of RequestedTopicState#generatedReplicaAssignment()
and TopicState#replicasAssignments() does not work well in case where the
request deals only with a subset of partitions (e.g., add partitions) or no
assignment at all (alter topic config). In particular:

1) Alter topic config use case: There is no replica assignment in the
request, and generatedReplicaAssignment()  returning either true or false
is both misleading. The user can interpret this as assignment being
generated or provided by the user originally (e.g., on topic create), while
I don’t think we track such thing.

2) On add partitions, we may have manual assignment for new partitions.
What I understood from the KIP,  generatedReplicaAssignment() will return
true or false based on whether new partitions were manually assigned or
not, while TopicState#replicasAssignments() will return replica assignments
for all partitions. I think it is confusing in a way that assignment of old
partitions could be auto-generated but new partitions are manually assigned.


3) Generalizing #2, suppose in a future, a user can re-assign replicas for
a set of partitions.


One way to address this with minimal changes to proposed API is to rename
RequestedTopicState#generatedReplicaAssignment() to
RequestedTopicState#manualReplicaAssignment() and change the API behavior
and description to : “True if the client explicitly provided replica
assignments in this request, which means that some or all assignments
returned by TopicState#replicasAssignments() are explicitly requested by
the user”. The user then will have to diff TopicState#replicasAssignments()
from clusterState and TopicState#replicasAssignments()  from
RequestedTopicState, and assume that assignments that are different are
manually assigned (if RequestedTopicState#manualReplicaAssignment()  returns
true). We will need to clearly document this and it still seems awkward.


I think a cleaner way is to make RequestedTopicState to provide replica
assignments only for partitions that were manually assigned replicas in the
request that is being validated. Similarly, for alter topic validation, it
would be nice to make it more clear for the user what has been changed. I
remember that you already raised that point earlier by comparing current
proposed API with having separate methods for each specific command.
However, I agree that it will make it harder to change the interface in the
future.


Could we explore the option of pushing methods that are currently in
TopicState to CreateTopicRequest and AlterTopicRequest? TopicState will
still be used for requesting current topic state via ClusterState.

Something like:

interface CreateTopicRequest extends AbstractRequestMetadata {

  // requested number of partitions or if manual assignment is given,
number of partitions in the assignment

  int numPartitions();

  // requested replication factor, or if manual assignment is given, number
of replicas in assignment for partition 0

  short replicationFactor();

 // replica assignment requested by the client, or null if assignment is
auto-generated

 map> manualReplicaAssignment();

 map configs();

}


interface AlterTopicRequest extends AbstractRequestMetadata {

  // updated topic configs, or null if not changed

  map updatedConfigs();

  // proposed replica assignment in this request, or null. For adding new
partitions request, this is proposed replica assignment for new partitions.
For replica re-assignment case, this is proposed new assignment.

  map> proposedReplicaAssignment();

  // new number of partitions (due to increase/decrease), or null if number
of partitions not changed

  Integer updatedNumPartitions()

}


I did not spend much time on my AlterTopicRequest interface proposal, but
the idea is basically to return only the parts which were changed. The
advantage of this approach over having separate methods for each specific
alter topic request is that it is more flexible for future mixing of what
can be updated in the topic state.


What do you think?


Thanks,

Anna


On Mon, Oct 9, 2017 at 1:39 AM, Tom Bentley  wrote:

> I've added RequestedTopicState, as discussed in my last email.
>
> I've also added a paragraph to the migration plan about old clients making
> policy-violating delete topics or delete records request.
>
> If no further comments a forthcoming in the next day or two then I will
> start a vote.
>
> Thanks,
>
> Tom
>
> On 5 October 2017 at 12:41, Tom Bentley  wrote:
>
> > I'd like to raise a somewhat subtle point about how the proposed API
> > should behave.
> >
> > The current CreateTopicPolicy gets passed either the request partition
> > count and replication factor, or the requested assignment. So if the
> > request had specified partition count and replication factor, the policy
> > sees a null replicaAssignments(). Likewise if the 

[jira] [Created] (KAFKA-6975) AdminClient.deleteRecords() may cause replicas unable to fetch from beginning

2018-05-31 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-6975:
---

 Summary: AdminClient.deleteRecords() may cause replicas unable to 
fetch from beginning
 Key: KAFKA-6975
 URL: https://issues.apache.org/jira/browse/KAFKA-6975
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.0
Reporter: Anna Povzner
Assignee: Anna Povzner


AdminClient.deleteRecords(beforeOffset(offset)) will set log start offset to 
the requested offset. If the requested offset is in the middle of the batch, 
the replica will not be able to fetch from that offset (because it is in the 
middle of the batch). 

One use-case where this could cause problems is replica re-assignment. Suppose 
we have a topic partition with 3 initial replicas, and at some point the user 
issues  AdminClient.deleteRecords() for the offset that falls in the middle of 
the batch. It now becomes log start offset for this topic partition. Suppose at 
some later time, the user starts partition re-assignment to 3 new replicas. The 
new replicas (followers) will start with HW = 0, will try to fetch from 0, then 
get "out of order offset" because 0 < log start offset (LSO); the follower will 
be able to reset offset to LSO of the leader and fetch LSO; the leader will 
send a batch in response with base offset 

[jira] [Resolved] (KAFKA-6795) Add unit test for ReplicaAlterLogDirsThread

2018-05-04 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner resolved KAFKA-6795.
-
   Resolution: Fixed
Fix Version/s: 2.0.0

> Add unit test for ReplicaAlterLogDirsThread
> ---
>
> Key: KAFKA-6795
> URL: https://issues.apache.org/jira/browse/KAFKA-6795
> Project: Kafka
>  Issue Type: Improvement
>        Reporter: Anna Povzner
>    Assignee: Anna Povzner
>Priority: Major
> Fix For: 2.0.0
>
>
> ReplicaAlterLogDirsThread was added as part of KIP-113 implementation, but 
> there is no unit test. 
> [~lindong] I assigned this to myself, since ideally I wanted to add unit 
> tests for KAFKA-6361 related changes (KIP-279), but feel free to re-assign. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6859) Follower should not send OffsetForLeaderEpoch for undefined leader epochs

2018-05-03 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-6859:
---

 Summary: Follower should not send OffsetForLeaderEpoch for 
undefined leader epochs
 Key: KAFKA-6859
 URL: https://issues.apache.org/jira/browse/KAFKA-6859
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.0
Reporter: Anna Povzner


This is more of an optimization, rather than correctness.

Currently, if the follower on inter broker protocol version 0.11 and higher, 
but on older message format, it does not track leader epochs. However, will 
still send OffsetForLeaderEpoch request to the leader with undefined epoch 
which is guaranteed to return undefined offset, so that the follower truncated 
to high watermark. Another example is a bootstrapping follower that does not 
have any leader epochs recorded, 

It is cleaner and more efficient to not send OffsetForLeaderEpoch requests to 
the follower with undefined leader epochs, since we already know the answer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6824) Transient failure in DynamicBrokerReconfigurationTest.testAddRemoveSslListener

2018-04-24 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-6824:
---

 Summary: Transient failure in 
DynamicBrokerReconfigurationTest.testAddRemoveSslListener
 Key: KAFKA-6824
 URL: https://issues.apache.org/jira/browse/KAFKA-6824
 Project: Kafka
  Issue Type: Bug
Reporter: Anna Povzner


Saw in my PR build (*JDK 7 and Scala 2.11* ):

*17:20:49* kafka.server.DynamicBrokerReconfigurationTest > 
testAddRemoveSslListener FAILED

*17:20:49*     java.lang.AssertionError: expected:<10> but was:<12>

*17:20:49*         at org.junit.Assert.fail(Assert.java:88)

*17:20:49*         at org.junit.Assert.failNotEquals(Assert.java:834)

*17:20:49*         at org.junit.Assert.assertEquals(Assert.java:645)

*17:20:49*         at org.junit.Assert.assertEquals(Assert.java:631)

*17:20:49*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:959)

*17:20:49*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:784)

*17:20:49*         at 
kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705)

*17:20:50*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6823) Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize

2018-04-24 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-6823:
---

 Summary: Transient failure in 
DynamicBrokerReconfigurationTest.testThreadPoolResize
 Key: KAFKA-6823
 URL: https://issues.apache.org/jira/browse/KAFKA-6823
 Project: Kafka
  Issue Type: Bug
Reporter: Anna Povzner


Saw in my PR build (*DK 10 and Scala 2.12 ):*

*15:58:46* kafka.server.DynamicBrokerReconfigurationTest > testThreadPoolResize 
FAILED

*15:58:46*     java.lang.AssertionError: Invalid threads: expected 6, got 7: 
List(ReplicaFetcherThread-0-0, ReplicaFetcherThread-0-1, 
ReplicaFetcherThread-0-2, ReplicaFetcherThread-0-0, ReplicaFetcherThread-0-2, 
ReplicaFetcherThread-1-0, ReplicaFetcherThread-0-1)

*15:58:46*         at org.junit.Assert.fail(Assert.java:88)

*15:58:46*         at org.junit.Assert.assertTrue(Assert.java:41)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyThreads(DynamicBrokerReconfigurationTest.scala:1147)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.maybeVerifyThreadPoolSize$1(DynamicBrokerReconfigurationTest.scala:412)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.resizeThreadPool$1(DynamicBrokerReconfigurationTest.scala:431)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.reducePoolSize$1(DynamicBrokerReconfigurationTest.scala:417)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.$anonfun$testThreadPoolResize$3(DynamicBrokerReconfigurationTest.scala:440)

*15:58:46*         at 
scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:156)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyThreadPoolResize$1(DynamicBrokerReconfigurationTest.scala:439)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:453)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6809) connections-created metric does not behave as expected

2018-04-19 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-6809:
---

 Summary: connections-created metric does not behave as expected
 Key: KAFKA-6809
 URL: https://issues.apache.org/jira/browse/KAFKA-6809
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.1
Reporter: Anna Povzner


"connections-created" sensor is described as "new connections established". It 
currently records only connections that the broker creates, but does not count 
connections received. Seems like we should also count connections received – 
either include them into this metric (and also clarify the description) or add 
a new metric (separately counting two types of connections). I am not sure how 
useful is to separate them, so I think we should do the first approach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-279: Fix log divergence between leader and follower after fast leader fail over

2018-04-17 Thread Anna Povzner
The vote for KIP-279 has passed with 5 binding and 2 non-binding +1s (and
no objections).


Thanks everyone for your reviews and feedback,

Anna


On Tue, Apr 17, 2018 at 1:49 PM, Anna Povzner <a...@confluent.io> wrote:

> Guozhang, thanks for catching this, I fixed the description (the example
> assumed response with 21, '11' was a typo).
>
> On Tue, Apr 17, 2018 at 1:25 PM, Anna Povzner <a...@confluent.io> wrote:
>
>> Hi Colin,
>>
>> Yes, the impact of "losing" entries in the LeaderEpoch file is more
>> round-trips for OffsetForLeaderEpoch. There is JIRA for the log cleaner:
>> https://issues.apache.org/jira/browse/KAFKA-6780, and to investigate if
>> there is actual possibility of losing committed records due to cleaning
>> further than high watermark. In any case, this KIP does not make it any
>> more or less likely.
>>
>> Thanks,
>> Anna
>>
>>
>>
>> On Tue, Apr 17, 2018 at 11:53 AM, Colin McCabe <cmcc...@apache.org>
>> wrote:
>>
>>> Thanks, Anna, this looks great.
>>>
>>> From the KIP:
>>>
>>>  > Impact of topic compaction
>>>  >
>>>  > The proposed solution requires that we preserve history in
>>>  > LeaderEpochSequence file. Note that this is also required in the
>>> current
>>>  > implementation if we want to guarantee no log divergence. The only
>>> reason
>>>  > for "losing" entries in LeaderEpoch file is if we actually lose
>>>  > LeaderEpoch file and have to rebuild it from the log. If we delete
>>> all
>>>  > offsets for a particular epoch for some topic partition, we may miss
>>> some
>>>  > entries in the LeaderEpochSequence file.
>>>  >
>>>  > We will not do any changes to compaction logic in this KIP, but here
>>> is
>>>  > possible fixes to compaction logic:
>>>  >
>>>  > Leave a tombstone in the log if we delete all offsets for some
>>> epoch,
>>>  > so that LeaderEpoch file can be rebuilt
>>>  > Do not compact further than persistent HW.
>>>
>>> Sorry if this has been answered before (I didn't find it in the DISCUSS
>>> thread) but what is the impact of "losing" these entries in the LeaderEpoch
>>> file?  I suppose it would mean that more round-trips for
>>> OffsetForLeaderEpoch might be required during a leader change.  That's the
>>> only impact, right?
>>>
>>> best,
>>> Colin
>>>
>>> On Mon, Apr 16, 2018, at 09:48, Ismael Juma wrote:
>>> > Thanks for the detailed KIP. +1 (binding)
>>> >
>>> > Ismael
>>> >
>>> > On Sat, Apr 14, 2018 at 3:54 PM, Anna Povzner <a...@confluent.io>
>>> wrote:
>>> >
>>> > > Hi All,
>>> > >
>>> > >
>>> > > I would like to start the vote on KIP-279: Fix log divergence between
>>> > > leader and follower after fast leader fail over.
>>> > >
>>> > >
>>> > > For reference, here's the KIP wiki:
>>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> > > 279%3A+Fix+log+divergence+between+leader+and+follower+
>>> > > after+fast+leader+fail+over
>>> > >
>>> > >
>>> > >
>>> > > and discussion thread:
>>> > > https://www.mail-archive.com/dev@kafka.apache.org/msg86753.html
>>> > >
>>> > >
>>> > > Thanks,
>>> > >
>>> > > Anna
>>> > >
>>>
>>
>>
>


Re: [VOTE] KIP-279: Fix log divergence between leader and follower after fast leader fail over

2018-04-17 Thread Anna Povzner
Guozhang, thanks for catching this, I fixed the description (the example
assumed response with 21, '11' was a typo).

On Tue, Apr 17, 2018 at 1:25 PM, Anna Povzner <a...@confluent.io> wrote:

> Hi Colin,
>
> Yes, the impact of "losing" entries in the LeaderEpoch file is more
> round-trips for OffsetForLeaderEpoch. There is JIRA for the log cleaner:
> https://issues.apache.org/jira/browse/KAFKA-6780, and to investigate if
> there is actual possibility of losing committed records due to cleaning
> further than high watermark. In any case, this KIP does not make it any
> more or less likely.
>
> Thanks,
> Anna
>
>
>
> On Tue, Apr 17, 2018 at 11:53 AM, Colin McCabe <cmcc...@apache.org> wrote:
>
>> Thanks, Anna, this looks great.
>>
>> From the KIP:
>>
>>  > Impact of topic compaction
>>  >
>>  > The proposed solution requires that we preserve history in
>>  > LeaderEpochSequence file. Note that this is also required in the
>> current
>>  > implementation if we want to guarantee no log divergence. The only
>> reason
>>  > for "losing" entries in LeaderEpoch file is if we actually lose
>>  > LeaderEpoch file and have to rebuild it from the log. If we delete all
>>  > offsets for a particular epoch for some topic partition, we may miss
>> some
>>  > entries in the LeaderEpochSequence file.
>>  >
>>  > We will not do any changes to compaction logic in this KIP, but here
>> is
>>  > possible fixes to compaction logic:
>>  >
>>  > Leave a tombstone in the log if we delete all offsets for some
>> epoch,
>>  > so that LeaderEpoch file can be rebuilt
>>  > Do not compact further than persistent HW.
>>
>> Sorry if this has been answered before (I didn't find it in the DISCUSS
>> thread) but what is the impact of "losing" these entries in the LeaderEpoch
>> file?  I suppose it would mean that more round-trips for
>> OffsetForLeaderEpoch might be required during a leader change.  That's the
>> only impact, right?
>>
>> best,
>> Colin
>>
>> On Mon, Apr 16, 2018, at 09:48, Ismael Juma wrote:
>> > Thanks for the detailed KIP. +1 (binding)
>> >
>> > Ismael
>> >
>> > On Sat, Apr 14, 2018 at 3:54 PM, Anna Povzner <a...@confluent.io>
>> wrote:
>> >
>> > > Hi All,
>> > >
>> > >
>> > > I would like to start the vote on KIP-279: Fix log divergence between
>> > > leader and follower after fast leader fail over.
>> > >
>> > >
>> > > For reference, here's the KIP wiki:
>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > 279%3A+Fix+log+divergence+between+leader+and+follower+
>> > > after+fast+leader+fail+over
>> > >
>> > >
>> > >
>> > > and discussion thread:
>> > > https://www.mail-archive.com/dev@kafka.apache.org/msg86753.html
>> > >
>> > >
>> > > Thanks,
>> > >
>> > > Anna
>> > >
>>
>
>


Re: [VOTE] KIP-279: Fix log divergence between leader and follower after fast leader fail over

2018-04-17 Thread Anna Povzner
Hi Colin,

Yes, the impact of "losing" entries in the LeaderEpoch file is more
round-trips for OffsetForLeaderEpoch. There is JIRA for the log cleaner:
https://issues.apache.org/jira/browse/KAFKA-6780, and to investigate if
there is actual possibility of losing committed records due to cleaning
further than high watermark. In any case, this KIP does not make it any
more or less likely.

Thanks,
Anna



On Tue, Apr 17, 2018 at 11:53 AM, Colin McCabe <cmcc...@apache.org> wrote:

> Thanks, Anna, this looks great.
>
> From the KIP:
>
>  > Impact of topic compaction
>  >
>  > The proposed solution requires that we preserve history in
>  > LeaderEpochSequence file. Note that this is also required in the
> current
>  > implementation if we want to guarantee no log divergence. The only
> reason
>  > for "losing" entries in LeaderEpoch file is if we actually lose
>  > LeaderEpoch file and have to rebuild it from the log. If we delete all
>  > offsets for a particular epoch for some topic partition, we may miss
> some
>  > entries in the LeaderEpochSequence file.
>  >
>  > We will not do any changes to compaction logic in this KIP, but here is
>  > possible fixes to compaction logic:
>  >
>  > Leave a tombstone in the log if we delete all offsets for some
> epoch,
>  > so that LeaderEpoch file can be rebuilt
>  > Do not compact further than persistent HW.
>
> Sorry if this has been answered before (I didn't find it in the DISCUSS
> thread) but what is the impact of "losing" these entries in the LeaderEpoch
> file?  I suppose it would mean that more round-trips for
> OffsetForLeaderEpoch might be required during a leader change.  That's the
> only impact, right?
>
> best,
> Colin
>
> On Mon, Apr 16, 2018, at 09:48, Ismael Juma wrote:
> > Thanks for the detailed KIP. +1 (binding)
> >
> > Ismael
> >
> > On Sat, Apr 14, 2018 at 3:54 PM, Anna Povzner <a...@confluent.io> wrote:
> >
> > > Hi All,
> > >
> > >
> > > I would like to start the vote on KIP-279: Fix log divergence between
> > > leader and follower after fast leader fail over.
> > >
> > >
> > > For reference, here's the KIP wiki:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 279%3A+Fix+log+divergence+between+leader+and+follower+
> > > after+fast+leader+fail+over
> > >
> > >
> > >
> > > and discussion thread:
> > > https://www.mail-archive.com/dev@kafka.apache.org/msg86753.html
> > >
> > >
> > > Thanks,
> > >
> > > Anna
> > >
>


[jira] [Created] (KAFKA-6795) Add unit test for ReplicaAlterLogDirsThread

2018-04-16 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-6795:
---

 Summary: Add unit test for ReplicaAlterLogDirsThread
 Key: KAFKA-6795
 URL: https://issues.apache.org/jira/browse/KAFKA-6795
 Project: Kafka
  Issue Type: Improvement
Reporter: Anna Povzner
Assignee: Anna Povzner


ReplicaAlterLogDirsThread was added as part of KIP-113 implementation, but 
there is no unit test. 

[~lindong] I assigned this to myself, since ideally I wanted to add unit tests 
for KAFKA-6361 related changes (KIP-279), but feel free to re-assign. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[VOTE] KIP-279: Fix log divergence between leader and follower after fast leader fail over

2018-04-14 Thread Anna Povzner
Hi All,


I would like to start the vote on KIP-279: Fix log divergence between
leader and follower after fast leader fail over.


For reference, here's the KIP wiki:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over



and discussion thread:
https://www.mail-archive.com/dev@kafka.apache.org/msg86753.html


Thanks,

Anna


Re: [DISCUSS] KIP-279: Fix log divergence between leader and follower after fast leader fail over

2018-04-13 Thread Anna Povzner
Thanks everyone for the feedback. I will start a voting thread tomorrow
morning if there are no more comments.

Regards,
Anna


On Wed, Apr 11, 2018 at 3:14 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Anna,
>
> Thanks for the KIP. Looks good to me.
>
> Great point on bounding the cleaning point in a compacted topic by high
> watermark. Filed https://issues.apache.org/jira/browse/KAFKA-6780 to track
> it.
>
> Jun
>
>
> On Thu, Apr 5, 2018 at 12:17 PM, Anna Povzner <a...@confluent.io> wrote:
>
> > Hi,
> >
> >
> > I just created KIP-279 to fix edge cases of log divergence for both clean
> > and unclean leader election configs.
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 279%3A+Fix+log+divergence+between+leader+and+follower+
> > after+fast+leader+fail+over
> >
> >
> > The KIP is basically a follow up to KIP-101, and proposes a slight
> > extension to the replication protocol to fix edge cases where logs can
> > diverge due to fast leader fail over.
> >
> >
> > Feedback and suggestions are welcome!
> >
> >
> > Thanks,
> >
> > Anna
> >
>


[jira] [Resolved] (KAFKA-6693) Add Consumer-only benchmark workload to Trogdor

2018-04-10 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner resolved KAFKA-6693.
-
Resolution: Fixed

https://github.com/apache/kafka/pull/4775

> Add Consumer-only benchmark workload to Trogdor
> ---
>
> Key: KAFKA-6693
> URL: https://issues.apache.org/jira/browse/KAFKA-6693
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>    Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Major
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Consumer-only benchmark workload that uses existing pre-populated topic



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-279: Fix log divergence between leader and follower after fast leader fail over

2018-04-09 Thread Anna Povzner
Ted and Jason, I see now how the description of unclean leader election
made the proposed approach sound more complicated (like there are more
roundtrips). I wrote it in such way to show correctness, where
theoretically, we could compare the "complete epoch lineage", but in
practice we compare only one or two recent leader epochs.

So, Jason's statements are correct. The common case for unclean leader
election is still one roundtrip, including the rare case reported in
KAFKA-6361 (two fast consecutive leader failovers).

I updated the description of handling unclean leader elections to address
this.



On Mon, Apr 9, 2018 at 10:01 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Anna,
>
> Thanks for picking this up! I think the solution looks good to me. Just
> wanted to check my understanding on one part. When describing the handling
> of unclean leader elections, you mention comparing the "complete epoch
> lineage" from both brokers in order to converge on the log. I think this
> makes it sound a bit scarier than it actually is. In practice, it seems
> like we'd only have multiple round trips if we hit a bunch of these already
> rare "fast leader failover" cases consecutively. As far as I know, the case
> I reported is the only instance we've seen in the wild, and with this
> solution, we'd only need one round trip to handle it. So while it may be
> theoretically possible to need multiple round trips for convergence, far
> and away the common case would only require a very small number (usually
> exactly one).
>
> Is that correct?
>
> Thanks,
> Jason
>
> On Fri, Apr 6, 2018 at 5:47 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
> > Makes sense.
> > Thanks for the explanation.
> >  Original message From: Anna Povzner <a...@confluent.io>
> > Date: 4/6/18  5:38 PM  (GMT-08:00) To: dev@kafka.apache.org Subject: Re:
> > [DISCUSS] KIP-279: Fix log divergence between leader and follower after
> > fast leader fail over
> > Hi Ted,
> >
> > I updated the Rejected Alternatives section with a more thorough
> > description of alternatives and reasoning for choosing the solution we
> > proposed.
> >
> > While it is more clear why the second alternative guarantees one
> roundtrip
> > for the clean leader election case, the proposed solution also guarantees
> > it. This is based on the fact that we cannot have more than one
> > back-to-back leader change due to preferred leader election where the
> > leader is not pushed out of the ISR, which means the follower will have
> at
> > most one leader epoch unknown to the new leader, and so the leader will
> be
> > able to respond with the epoch that the follower knows about in the first
> > response.
> >
> > For unclean leader election case, the second alternative reduces the
> number
> > of roundtrips but for rare cases: we need at least 3 fast leader changes
> to
> > see the advantage. Approximate calculation: Proposed solution requires
> > (N+1)/2 roundtrips for N fast leader changes (worst-case, could be less
> > roundtrips for the same number of leader change); Alternative solution
> > requires at most 2 roundtrips (except super rare cases, where we may want
> > to limit the size of OffsetForLeaderEpoch request). This comes at the
> cost
> > of a bigger change in the OffsetForLeaderEpoch request,
> > larger OffsetForLeaderEpoch request size on average, and additional
> > complexity of dealing with how long the sequence should be for the
> > subsequent OffsetForLeaderEpoch requests, handling the edge/contrived
> cases
> > where sequence may become too long.
> >
> > So, I think, the main trade-off here is improving efficiency of a broker
> > becoming a follower in rare cases of unclean leader election/at least 3
> > fast leader changes vs. less complexity in the common case. The proposed
> > solution in the KIP is for less complexity.
> >
> > Please let me know if you have any concerns or suggestions.
> >
> > Thanks,
> > Anna
> >
> > On Thu, Apr 5, 2018 at 1:33 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> >
> > > For the second alternative which was rejected (The follower sends all
> > > sequences of {leader_epoch, end_offset})
> > >
> > > bq. also increases the size of OffsetForLeaderEpoch request by at least
> > > 64bit
> > >
> > > Though the size increases, the number of roundtrips is reduced
> > meaningfully
> > > which would increase the robustness of the solution.
> > >
> > > Please expand the reasoning for unclean leader election for this
> >

Re: [DISCUSS] KIP-279: Fix log divergence between leader and follower after fast leader fail over

2018-04-06 Thread Anna Povzner
Hi Ted,

I updated the Rejected Alternatives section with a more thorough
description of alternatives and reasoning for choosing the solution we
proposed.

While it is more clear why the second alternative guarantees one roundtrip
for the clean leader election case, the proposed solution also guarantees
it. This is based on the fact that we cannot have more than one
back-to-back leader change due to preferred leader election where the
leader is not pushed out of the ISR, which means the follower will have at
most one leader epoch unknown to the new leader, and so the leader will be
able to respond with the epoch that the follower knows about in the first
response.

For unclean leader election case, the second alternative reduces the number
of roundtrips but for rare cases: we need at least 3 fast leader changes to
see the advantage. Approximate calculation: Proposed solution requires
(N+1)/2 roundtrips for N fast leader changes (worst-case, could be less
roundtrips for the same number of leader change); Alternative solution
requires at most 2 roundtrips (except super rare cases, where we may want
to limit the size of OffsetForLeaderEpoch request). This comes at the cost
of a bigger change in the OffsetForLeaderEpoch request,
larger OffsetForLeaderEpoch request size on average, and additional
complexity of dealing with how long the sequence should be for the
subsequent OffsetForLeaderEpoch requests, handling the edge/contrived cases
where sequence may become too long.

So, I think, the main trade-off here is improving efficiency of a broker
becoming a follower in rare cases of unclean leader election/at least 3
fast leader changes vs. less complexity in the common case. The proposed
solution in the KIP is for less complexity.

Please let me know if you have any concerns or suggestions.

Thanks,
Anna

On Thu, Apr 5, 2018 at 1:33 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> For the second alternative which was rejected (The follower sends all
> sequences of {leader_epoch, end_offset})
>
> bq. also increases the size of OffsetForLeaderEpoch request by at least
> 64bit
>
> Though the size increases, the number of roundtrips is reduced meaningfully
> which would increase the robustness of the solution.
>
> Please expand the reasoning for unclean leader election for this
> alternative.
>
> Thanks
>
> On Thu, Apr 5, 2018 at 12:17 PM, Anna Povzner <a...@confluent.io> wrote:
>
> > Hi,
> >
> >
> > I just created KIP-279 to fix edge cases of log divergence for both clean
> > and unclean leader election configs.
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 279%3A+Fix+log+divergence+between+leader+and+follower+
> > after+fast+leader+fail+over
> >
> >
> > The KIP is basically a follow up to KIP-101, and proposes a slight
> > extension to the replication protocol to fix edge cases where logs can
> > diverge due to fast leader fail over.
> >
> >
> > Feedback and suggestions are welcome!
> >
> >
> > Thanks,
> >
> > Anna
> >
>


[DISCUSS] KIP-279: Fix log divergence between leader and follower after fast leader fail over

2018-04-05 Thread Anna Povzner
Hi,


I just created KIP-279 to fix edge cases of log divergence for both clean
and unclean leader election configs.


https://cwiki.apache.org/confluence/display/KAFKA/KIP-279%3A+Fix+log+divergence+between+leader+and+follower+after+fast+leader+fail+over


The KIP is basically a follow up to KIP-101, and proposes a slight
extension to the replication protocol to fix edge cases where logs can
diverge due to fast leader fail over.


Feedback and suggestions are welcome!


Thanks,

Anna


[jira] [Created] (KAFKA-6693) Add Consumer-only benchmark workload to Trogdor

2018-03-20 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-6693:
---

 Summary: Add Consumer-only benchmark workload to Trogdor
 Key: KAFKA-6693
 URL: https://issues.apache.org/jira/browse/KAFKA-6693
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Reporter: Anna Povzner
Assignee: Anna Povzner


Consumer-only benchmark workload that uses existing pre-populated topic



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4691) ProducerInterceptor.onSend() is called after key and value are serialized

2017-01-25 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15838829#comment-15838829
 ] 

Anna Povzner commented on KAFKA-4691:
-

I agree with [~mjsax] about not changing KafkaProducer API. Instead, not have 
any producer interceptors configured, if we do that change and let Streams 
intercept.

In the case of completely disabling the producer interceptor, and implementing 
this functionality in Streams, RecordCollectorImpl.send() should also call 
interceptor's onAcknowledgement(), in the similar situations as KafkaProducer 
does. E.g. if send() fails, onAcknowledgement() should be called with mostly 
empty RecordMetadata but with topic and partition set. Also, 
onAcknowledgement() should be called from the onCompletion in 
RecordCollectorImpl.send(). It looks like all of that could be implemented in 
RecordCollectorImpl.send(). 

> ProducerInterceptor.onSend() is called after key and value are serialized
> -
>
> Key: KAFKA-4691
> URL: https://issues.apache.org/jira/browse/KAFKA-4691
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, streams
>Affects Versions: 0.10.1.1
>Reporter: Francesco Lemma
>  Labels: easyfix
> Attachments: 2017-01-24 00_50_55-SDG_CR33_DevStudio - Java EE - 
> org.apache.kafka.streams.processor.internals.Reco.png
>
>
> According to the JavaDoc 
> (https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html)
>  " This is called from KafkaProducer.send(ProducerRecord) and 
> KafkaProducer.send(ProducerRecord, Callback) methods, before key and value 
> get serialized and partition is assigned (if partition is not specified in 
> ProducerRecord)".
> Although when using this with Kafka Streams 
> (StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)) the 
> key and value contained in the record object are already serialized.
> As you can see from the screenshot, the serialization is performed inside 
> RecordCollectionImpl.send(ProducerRecord<K, V> record, Serializer 
> keySerializer, Serializer valueSerializer,
> StreamPartitioner<K, V> partitioner), effectively 
> before calling the send method of the producer which will trigger the 
> interceptor.
> This makes it unable to perform any kind of operation involving the key or 
> value of the message, unless at least performing an additional 
> deserialization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3777) Extract the existing LRU cache out of RocksDBStore

2016-07-22 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3777 started by Anna Povzner.
---
> Extract the existing LRU cache out of RocksDBStore
> --
>
> Key: KAFKA-3777
> URL: https://issues.apache.org/jira/browse/KAFKA-3777
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Eno Thereska
>    Assignee: Anna Povzner
> Fix For: 0.10.1.0
>
>
> The LRU cache that is currently inside the RocksDbStore class. As part of 
> KAFKA-3776 it needs to come outside of RocksDbStore and be a separate 
> component used in:
> 1. KGroupedStream.aggregate() / reduce(), 
> 2. KStream.aggregateByKey() / reduceByKey(),
> 3. KTable.to() (this will be done in KAFKA-3779).
> As all of the above operators can have a cache on top to deduplicate the 
> materialized state store in RocksDB.
> The scope of this JIRA is to extract out the cache of RocksDBStore, and keep 
> them as item 1) and 2) above; and it should be done together / after 
> KAFKA-3780.
> Note it is NOT in the scope of this JIRA to re-write the cache, so this will 
> basically stay the same record-based cache we currently have.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3597) Enable query ConsoleConsumer and VerifiableProducer if they shutdown cleanly

2016-04-28 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner updated KAFKA-3597:

Status: Patch Available  (was: In Progress)

> Enable query ConsoleConsumer and VerifiableProducer if they shutdown cleanly
> 
>
> Key: KAFKA-3597
> URL: https://issues.apache.org/jira/browse/KAFKA-3597
> Project: Kafka
>  Issue Type: Test
>        Reporter: Anna Povzner
>    Assignee: Anna Povzner
> Fix For: 0.10.0.0
>
>
> It would be useful for some tests to check if ConsoleConsumer and 
> VerifiableProducer shutdown cleanly or not. 
> Add methods to ConsoleConsumer and VerifiableProducer that return true if all 
> producers/consumes shutdown cleanly; otherwise false. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3566) Enable VerifiableProducer and ConsoleConsumer to run with interceptors

2016-04-27 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3566 started by Anna Povzner.
---
> Enable VerifiableProducer and ConsoleConsumer to run with interceptors
> --
>
> Key: KAFKA-3566
> URL: https://issues.apache.org/jira/browse/KAFKA-3566
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.10.0.0
>    Reporter: Anna Povzner
>Assignee: Anna Povzner
>  Labels: test
>
> Add interceptor class list and export path list params to VerifiableProducer 
> and ConsoleConsumer constructors. This is to allow running VerifiableProducer 
> and ConsoleConsumer with interceptors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3597) Enable query ConsoleConsumer and VerifiableProducer if they shutdown cleanly

2016-04-27 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3597 started by Anna Povzner.
---
> Enable query ConsoleConsumer and VerifiableProducer if they shutdown cleanly
> 
>
> Key: KAFKA-3597
> URL: https://issues.apache.org/jira/browse/KAFKA-3597
> Project: Kafka
>  Issue Type: Test
>        Reporter: Anna Povzner
>    Assignee: Anna Povzner
> Fix For: 0.10.0.0
>
>
> It would be useful for some tests to check if ConsoleConsumer and 
> VerifiableProducer shutdown cleanly or not. 
> Add methods to ConsoleConsumer and VerifiableProducer that return true if all 
> producers/consumes shutdown cleanly; otherwise false. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3597) Enable query ConsoleConsumer and VerifiableProducer if they shutdown cleanly

2016-04-20 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-3597:
---

 Summary: Enable query ConsoleConsumer and VerifiableProducer if 
they shutdown cleanly
 Key: KAFKA-3597
 URL: https://issues.apache.org/jira/browse/KAFKA-3597
 Project: Kafka
  Issue Type: Test
Reporter: Anna Povzner
Assignee: Anna Povzner
 Fix For: 0.10.0.0


It would be useful for some tests to check if ConsoleConsumer and 
VerifiableProducer shutdown cleanly or not. 

Add methods to ConsoleConsumer and VerifiableProducer that return true if all 
producers/consumes shutdown cleanly; otherwise false. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3566) Enable VerifiableProducer and ConsoleConsumer to run with interceptors

2016-04-15 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-3566:
---

 Summary: Enable VerifiableProducer and ConsoleConsumer to run with 
interceptors
 Key: KAFKA-3566
 URL: https://issues.apache.org/jira/browse/KAFKA-3566
 Project: Kafka
  Issue Type: Test
Affects Versions: 0.10.0.0
Reporter: Anna Povzner
Assignee: Anna Povzner


Add interceptor class list and export path list params to VerifiableProducer 
and ConsoleConsumer constructors. This is to allow running VerifiableProducer 
and ConsoleConsumer with interceptors.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3555) Unexpected close of KStreams transformer

2016-04-13 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-3555:
---

 Summary: Unexpected close of KStreams transformer 
 Key: KAFKA-3555
 URL: https://issues.apache.org/jira/browse/KAFKA-3555
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.0.0
Reporter: Anna Povzner
Assignee: Guozhang Wang


I consistently get this behavior when running my system test that runs 1-node 
kafka cluster.

We implemented TransformerSupplier, and the topology is 
transform().filter().map().filter().aggregate(). I have a log message in my 
transformer's close() method. On every run of the test, I see that after 
running 10-20 seconds, transformer's close() is called. Then, in about 20 
seconds, I see that transformer is re-initialized and continues running.

I don't see any exceptions happening in KStreams before close() happens.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3320) Add successful acks verification to ProduceConsumeValidateTest

2016-03-27 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15213677#comment-15213677
 ] 

Anna Povzner commented on KAFKA-3320:
-

If you look at verifiable_producer.py, it collects all successfully produced 
messages into acked_values. If producer send() was unsuccessful, those messages 
are collected into not_acked_values. However, our tests do not check whether 
any produce send() got an error. Suppose the test tried to produce 100 
messages, and only 50 were successfully produced. If the consumer successfully 
consumed 50 messages, then the test is considered a success. It would be good 
to verify that we also did not get any produce errors for some tests.

> Add successful acks verification to ProduceConsumeValidateTest
> --
>
> Key: KAFKA-3320
> URL: https://issues.apache.org/jira/browse/KAFKA-3320
> Project: Kafka
>  Issue Type: Test
>        Reporter: Anna Povzner
>
> Currently ProduceConsumeValidateTest only validates that each acked message 
> was consumed. Some tests may want an additional verification that all acks 
> were successful.
> This JIRA is to add an addition optional verification that all acks were 
> successful and use it in couple of tests that need that verification. Example 
> is compression test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3202) Add system test for KIP-31 and KIP-32 - Change message format version on the fly

2016-03-10 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15189917#comment-15189917
 ] 

Anna Povzner commented on KAFKA-3202:
-

[~enothereska] I think you meant to post the test description in KAFKA-3188 
(Compatibility test). Not sure if you meant to pick this JIRA or KAFKA-3188.

> Add system test for KIP-31 and KIP-32 - Change message format version on the 
> fly
> 
>
> Key: KAFKA-3202
> URL: https://issues.apache.org/jira/browse/KAFKA-3202
> Project: Kafka
>  Issue Type: Sub-task
>  Components: system tests
>Reporter: Jiangjie Qin
>Assignee: Eno Thereska
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> The system test should cover the case that message format changes are made 
> when clients are producing/consuming. The message format change should not 
> cause client side issue.
> We already cover 0.10 brokers with old producers/consumers in upgrade tests. 
> So, the main thing to test is a mix of 0.9 and 0.10 producers and consumers. 
> E.g., test1: 0.9 producer/0.10 consumer and then test2: 0.10 producer/0.9 
> consumer. And then, each of them: compression/no compression (like in upgrade 
> test). And we could probably add another dimension : topic configured with 
> CreateTime (default) and LogAppendTime. So, total 2x2x2 combinations (but 
> maybe can reduce that — eg. do LogAppendTime with compression only).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3303) Pass partial record metadata to Interceptor onAcknowledgement in case of errors

2016-03-04 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner updated KAFKA-3303:

Status: Patch Available  (was: In Progress)

> Pass partial record metadata to Interceptor onAcknowledgement in case of 
> errors
> ---
>
> Key: KAFKA-3303
> URL: https://issues.apache.org/jira/browse/KAFKA-3303
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Anna Povzner
>    Assignee: Anna Povzner
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> Currently Interceptor.onAcknowledgement behaves similarly to Callback. If 
> exception occurred and exception is passed to onAcknowledgement, metadata 
> param is set to null.
> However, it would be useful to pass topic, and partition if available to the 
> interceptor so that it knows which topic/partition got an error.
> This is part of KIP-42.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3201) Add system test for KIP-31 and KIP-32 - Upgrade Test

2016-03-03 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner updated KAFKA-3201:

Status: Patch Available  (was: In Progress)

Verified that upgrade tests are passing on Jenkins (ran 5 times).

> Add system test for KIP-31 and KIP-32 - Upgrade Test
> 
>
> Key: KAFKA-3201
> URL: https://issues.apache.org/jira/browse/KAFKA-3201
> Project: Kafka
>  Issue Type: Sub-task
>  Components: system tests
>Reporter: Jiangjie Qin
>Assignee: Anna Povzner
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> This system test should test the procedure to upgrade a Kafka broker from 
> 0.8.x and 0.9.0 to 0.10.0
> The procedure is documented in KIP-32:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3303) Pass partial record metadata to Interceptor onAcknowledgement in case of errors

2016-03-03 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3303 started by Anna Povzner.
---
> Pass partial record metadata to Interceptor onAcknowledgement in case of 
> errors
> ---
>
> Key: KAFKA-3303
> URL: https://issues.apache.org/jira/browse/KAFKA-3303
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Anna Povzner
>    Assignee: Anna Povzner
>Priority: Blocker
> Fix For: 0.10.0.0
>
>
> Currently Interceptor.onAcknowledgement behaves similarly to Callback. If 
> exception occurred and exception is passed to onAcknowledgement, metadata 
> param is set to null.
> However, it would be useful to pass topic, and partition if available to the 
> interceptor so that it knows which topic/partition got an error.
> This is part of KIP-42.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3320) Add successful acks verification to ProduceConsumeValidateTest

2016-03-02 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-3320:
---

 Summary: Add successful acks verification to 
ProduceConsumeValidateTest
 Key: KAFKA-3320
 URL: https://issues.apache.org/jira/browse/KAFKA-3320
 Project: Kafka
  Issue Type: Test
Reporter: Anna Povzner


Currently ProduceConsumeValidateTest only validates that each acked message was 
consumed. Some tests may want an additional verification that all acks were 
successful.

This JIRA is to add an addition optional verification that all acks were 
successful and use it in couple of tests that need that verification. Example 
is compression test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3303) Pass partial record metadata to Interceptor onAcknowledgement in case of errors

2016-02-29 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-3303:
---

 Summary: Pass partial record metadata to Interceptor 
onAcknowledgement in case of errors
 Key: KAFKA-3303
 URL: https://issues.apache.org/jira/browse/KAFKA-3303
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.10.0.0
Reporter: Anna Povzner
Assignee: Anna Povzner
 Fix For: 0.10.0.0


Currently Interceptor.onAcknowledgement behaves similarly to Callback. If 
exception occurred and exception is passed to onAcknowledgement, metadata param 
is set to null.

However, it would be useful to pass topic, and partition if available to the 
interceptor so that it knows which topic/partition got an error.

This is part of KIP-42.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3196) KIP-42 (part 2): add record size and CRC to RecordMetadata and ConsumerRecords

2016-02-24 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner updated KAFKA-3196:

Status: Patch Available  (was: In Progress)

> KIP-42 (part 2): add record size and CRC to RecordMetadata and ConsumerRecords
> --
>
> Key: KAFKA-3196
> URL: https://issues.apache.org/jira/browse/KAFKA-3196
> Project: Kafka
>  Issue Type: Improvement
>        Reporter: Anna Povzner
>    Assignee: Anna Povzner
> Fix For: 0.10.0.0
>
>
> This is the second (smaller) part of KIP-42, which includes: Add record size 
> and CRC to RecordMetadata and ConsumerRecord.
> See details in KIP-42 wiki: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3196) KIP-42 (part 2): add record size and CRC to RecordMetadata and ConsumerRecords

2016-02-24 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner updated KAFKA-3196:

Fix Version/s: 0.10.0.0

> KIP-42 (part 2): add record size and CRC to RecordMetadata and ConsumerRecords
> --
>
> Key: KAFKA-3196
> URL: https://issues.apache.org/jira/browse/KAFKA-3196
> Project: Kafka
>  Issue Type: Improvement
>        Reporter: Anna Povzner
>    Assignee: Anna Povzner
> Fix For: 0.10.0.0
>
>
> This is the second (smaller) part of KIP-42, which includes: Add record size 
> and CRC to RecordMetadata and ConsumerRecord.
> See details in KIP-42 wiki: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3214) Add consumer system tests for compressed topics

2016-02-24 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner updated KAFKA-3214:

Fix Version/s: 0.10.0.0

> Add consumer system tests for compressed topics
> ---
>
> Key: KAFKA-3214
> URL: https://issues.apache.org/jira/browse/KAFKA-3214
> Project: Kafka
>  Issue Type: Test
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Anna Povzner
> Fix For: 0.10.0.0
>
>
> As far as I can tell, we don't have any ducktape tests which verify 
> correctness when compression is enabled. If we did, we might have caught 
> KAFKA-3179 earlier.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3214) Add consumer system tests for compressed topics

2016-02-24 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner updated KAFKA-3214:

Status: Patch Available  (was: In Progress)

> Add consumer system tests for compressed topics
> ---
>
> Key: KAFKA-3214
> URL: https://issues.apache.org/jira/browse/KAFKA-3214
> Project: Kafka
>  Issue Type: Test
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Anna Povzner
> Fix For: 0.10.0.0
>
>
> As far as I can tell, we don't have any ducktape tests which verify 
> correctness when compression is enabled. If we did, we might have caught 
> KAFKA-3179 earlier.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3214) Add consumer system tests for compressed topics

2016-02-23 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3214 started by Anna Povzner.
---
> Add consumer system tests for compressed topics
> ---
>
> Key: KAFKA-3214
> URL: https://issues.apache.org/jira/browse/KAFKA-3214
> Project: Kafka
>  Issue Type: Test
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Anna Povzner
>
> As far as I can tell, we don't have any ducktape tests which verify 
> correctness when compression is enabled. If we did, we might have caught 
> KAFKA-3179 earlier.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3201) Add system test for KIP-31 and KIP-32 - Upgrade Test

2016-02-23 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15159416#comment-15159416
 ] 

Anna Povzner commented on KAFKA-3201:
-

Here is a set of tests:

1. Setup:  Producer (0.8) → Kafka Cluster → Consumer (0.8)
First rolling bounce: Set inter.broker.protocol.version = 0.8 and 
message.format.version = 0.8
Second rolling bonus, use latest (default)  inter.broker.protocol.version  and 
message.format.version

2. Setup:  Producer (0.9) → Kafka Cluster → Consumer (0.9)
First rolling bounce: Set inter.broker.protocol.version = 0.9 and 
message.format.version = 0.9
Second rolling bonus, use latest (default)  inter.broker.protocol.version  and 
message.format.version

3. Setup:  Producer (0.9) → Kafka Cluster → Consumer (0.9)
First rolling bounce: Set inter.broker.protocol.version = 0.9 and 
message.format.version = 0.9
Second rolling bonus: use inter.broker.protocol.version = 0.10 and 
message.format.version = 0.9

All three tests will be producing/consuming in the background; in the end the 
messages consumed will be validated; we will use a mix of producers: using 
compression and not using compression.

[~becket_qin] I understand the test where message format changes on the fly is 
KAFKA-3202, so I will not do a third phase in setup 3 to move to 0.10 message 
format (because it will be covered in KAFKA-3202). Is my understanding correct?


> Add system test for KIP-31 and KIP-32 - Upgrade Test
> 
>
> Key: KAFKA-3201
> URL: https://issues.apache.org/jira/browse/KAFKA-3201
> Project: Kafka
>  Issue Type: Sub-task
>  Components: system tests
>Reporter: Jiangjie Qin
>Assignee: Anna Povzner
> Fix For: 0.10.0.0
>
>
> This system test should test the procedure to upgrade a Kafka broker from 
> 0.8.x and 0.9.0 to 0.10.0
> The procedure is documented in KIP-32:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3201) Add system test for KIP-31 and KIP-32 - Upgrade Test

2016-02-22 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3201 started by Anna Povzner.
---
> Add system test for KIP-31 and KIP-32 - Upgrade Test
> 
>
> Key: KAFKA-3201
> URL: https://issues.apache.org/jira/browse/KAFKA-3201
> Project: Kafka
>  Issue Type: Sub-task
>  Components: system tests
>Reporter: Jiangjie Qin
>Assignee: Anna Povzner
> Fix For: 0.10.0.0
>
>
> This system test should test the procedure to upgrade a Kafka broker from 
> 0.8.x and 0.9.0 to 0.10.0
> The procedure is documented in KIP-32:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3256) Large number of system test failures

2016-02-22 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15157723#comment-15157723
 ] 

Anna Povzner commented on KAFKA-3256:
-

[~becket_qin] I wrote my comment without seeing yours. Yes, I think tear down 
timeout failures are unrelated and I don't think hey actually cause any issues 
([~geoffra] ?). 

I'll take on upgrade and compatibility system tests if you don't mind. 

> Large number of system test failures
> 
>
> Key: KAFKA-3256
> URL: https://issues.apache.org/jira/browse/KAFKA-3256
> Project: Kafka
>  Issue Type: Bug
>Reporter: Geoff Anderson
>Assignee: Jiangjie Qin
>
> Confluent's nightly run of the kafka system tests reported a large number of 
> failures beginning 2/20/2016
> Test run: 2016-02-19--001.1455897182--apache--trunk--eee9522/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-19--001.1455897182--apache--trunk--eee9522/report.html
> Pass: 136
> Fail: 0
> Test run: 2016-02-20--001.1455979842--apache--trunk--5caa800/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-20--001.1455979842--apache--trunk--5caa800/report.html
> Pass: 72
> Fail: 64
> I.e. trunk@eee9522 was the last passing run, and trunk@5caa800 had a large 
> number of failures.
> Given its complexity, the most likely culprit is 45c8195fa, and I confirmed 
> this is the first commit with failures on a small number of tests.
> [~becket_qin] do you mind investigating?
> {code}
> commit 5caa800e217c6b83f62ee3e6b5f02f56e331b309
> Author: Jun Rao <jun...@gmail.com>
> Date:   Fri Feb 19 09:40:59 2016 -0800
> trivial fix to authorization CLI table
> commit 45c8195fa14c766b200c720f316836dbb84e9d8b
> Author: Jiangjie Qin <becket@gmail.com>
> Date:   Fri Feb 19 07:56:40 2016 -0800
> KAFKA-3025; Added timetamp to Message and use relative offset.
> commit eee95228fabe1643baa016a2d49fb0a9fe2c66bd
> Author: Yasuhiro Matsuda <yasuh...@confluent.io>
> Date:   Thu Feb 18 09:39:30 2016 +0800
> MINOR: remove streams config params from producer/consumer configs
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3256) Large number of system test failures

2016-02-22 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15157719#comment-15157719
 ] 

Anna Povzner commented on KAFKA-3256:
-

FYI: The upgrade test fails with this error:
java.lang.IllegalArgumentException: requirement failed: message.format.version 
0.10.0-IV0 cannot be used when inter.broker.protocol.version is set to 0.8.2

I think this is expected, right? We need to use 0.9.0 (or 0.8) message format 
in the first pass of upgrade in 0.8 to 0.10 upgrade test (which is what current 
upgrade test is testing), is that correct?

> Large number of system test failures
> 
>
> Key: KAFKA-3256
> URL: https://issues.apache.org/jira/browse/KAFKA-3256
> Project: Kafka
>  Issue Type: Bug
>Reporter: Geoff Anderson
>Assignee: Jiangjie Qin
>
> Confluent's nightly run of the kafka system tests reported a large number of 
> failures beginning 2/20/2016
> Test run: 2016-02-19--001.1455897182--apache--trunk--eee9522/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-19--001.1455897182--apache--trunk--eee9522/report.html
> Pass: 136
> Fail: 0
> Test run: 2016-02-20--001.1455979842--apache--trunk--5caa800/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-20--001.1455979842--apache--trunk--5caa800/report.html
> Pass: 72
> Fail: 64
> I.e. trunk@eee9522 was the last passing run, and trunk@5caa800 had a large 
> number of failures.
> Given its complexity, the most likely culprit is 45c8195fa, and I confirmed 
> this is the first commit with failures on a small number of tests.
> [~becket_qin] do you mind investigating?
> {code}
> commit 5caa800e217c6b83f62ee3e6b5f02f56e331b309
> Author: Jun Rao <jun...@gmail.com>
> Date:   Fri Feb 19 09:40:59 2016 -0800
> trivial fix to authorization CLI table
> commit 45c8195fa14c766b200c720f316836dbb84e9d8b
> Author: Jiangjie Qin <becket@gmail.com>
> Date:   Fri Feb 19 07:56:40 2016 -0800
> KAFKA-3025; Added timetamp to Message and use relative offset.
> commit eee95228fabe1643baa016a2d49fb0a9fe2c66bd
> Author: Yasuhiro Matsuda <yasuh...@confluent.io>
> Date:   Thu Feb 18 09:39:30 2016 +0800
> MINOR: remove streams config params from producer/consumer configs
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3256) Large number of system test failures

2016-02-22 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15157604#comment-15157604
 ] 

Anna Povzner commented on KAFKA-3256:
-

[~becket_qin], [~ijuma], [~geoffra] The remaining system tests are 
compatibility test and rolling upgrade tests. The issue is that both tests 
assume trunk to be 0.9. Since we are testing 0.8 to 0.9 upgrade tests (and 
similarly compatibility tests) in 0.9 branch, we don't need to port the tests 
to get 0.9 version vs. trunk. We have separate JIRAs (KAFKA-3201 and 
KAFKA-3188) to add 0.8 to 0.10 and 0.9 to 0.10 upgrade tests, and test 
compatibility of mix of 0.9 and 0.10 clients with 0.10 brokers. My proposal to 
have a patch with current fixes, and address compatibility and upgrade test 
failures as part of KAFKA-3201 and KAFKA-3188, which are currently assigned to 
me.

> Large number of system test failures
> 
>
> Key: KAFKA-3256
> URL: https://issues.apache.org/jira/browse/KAFKA-3256
> Project: Kafka
>  Issue Type: Bug
>Reporter: Geoff Anderson
>Assignee: Jiangjie Qin
>
> Confluent's nightly run of the kafka system tests reported a large number of 
> failures beginning 2/20/2016
> Test run: 2016-02-19--001.1455897182--apache--trunk--eee9522/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-19--001.1455897182--apache--trunk--eee9522/report.html
> Pass: 136
> Fail: 0
> Test run: 2016-02-20--001.1455979842--apache--trunk--5caa800/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-20--001.1455979842--apache--trunk--5caa800/report.html
> Pass: 72
> Fail: 64
> I.e. trunk@eee9522 was the last passing run, and trunk@5caa800 had a large 
> number of failures.
> Given its complexity, the most likely culprit is 45c8195fa, and I confirmed 
> this is the first commit with failures on a small number of tests.
> [~becket_qin] do you mind investigating?
> {code}
> commit 5caa800e217c6b83f62ee3e6b5f02f56e331b309
> Author: Jun Rao <jun...@gmail.com>
> Date:   Fri Feb 19 09:40:59 2016 -0800
> trivial fix to authorization CLI table
> commit 45c8195fa14c766b200c720f316836dbb84e9d8b
> Author: Jiangjie Qin <becket@gmail.com>
> Date:   Fri Feb 19 07:56:40 2016 -0800
> KAFKA-3025; Added timetamp to Message and use relative offset.
> commit eee95228fabe1643baa016a2d49fb0a9fe2c66bd
> Author: Yasuhiro Matsuda <yasuh...@confluent.io>
> Date:   Thu Feb 18 09:39:30 2016 +0800
> MINOR: remove streams config params from producer/consumer configs
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Work started] (KAFKA-3196) KIP-42 (part 2): add record size and CRC to RecordMetadata and ConsumerRecords

2016-02-21 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-3196 started by Anna Povzner.
---
> KIP-42 (part 2): add record size and CRC to RecordMetadata and ConsumerRecords
> --
>
> Key: KAFKA-3196
> URL: https://issues.apache.org/jira/browse/KAFKA-3196
> Project: Kafka
>  Issue Type: Improvement
>        Reporter: Anna Povzner
>    Assignee: Anna Povzner
>
> This is the second (smaller) part of KIP-42, which includes: Add record size 
> and CRC to RecordMetadata and ConsumerRecord.
> See details in KIP-42 wiki: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3256) Large number of system test failures

2016-02-20 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15155911#comment-15155911
 ] 

Anna Povzner commented on KAFKA-3256:
-

Also for completeness, the remaining system tests:
mirror_maker_test.py, zookeeper_security_upgrade_test.py, and upgrade_test.py 
all use console consumer and set message_validator=is_int. So, they all expect  
console consumer to output values that are integers, and additional 
"CreateTime:> breaks that.


> Large number of system test failures
> 
>
> Key: KAFKA-3256
> URL: https://issues.apache.org/jira/browse/KAFKA-3256
> Project: Kafka
>  Issue Type: Bug
>Reporter: Geoff Anderson
>Assignee: Jiangjie Qin
>
> Confluent's nightly run of the kafka system tests reported a large number of 
> failures beginning 2/20/2016
> Test run: 2016-02-19--001.1455897182--apache--trunk--eee9522/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-19--001.1455897182--apache--trunk--eee9522/report.html
> Pass: 136
> Fail: 0
> Test run: 2016-02-20--001.1455979842--apache--trunk--5caa800/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-20--001.1455979842--apache--trunk--5caa800/report.html
> Pass: 72
> Fail: 64
> I.e. trunk@eee9522 was the last passing run, and trunk@5caa800 had a large 
> number of failures.
> Given its complexity, the most likely culprit is 45c8195fa, and I confirmed 
> this is the first commit with failures on a small number of tests.
> [~becket_qin] do you mind investigating?
> {code}
> commit 5caa800e217c6b83f62ee3e6b5f02f56e331b309
> Author: Jun Rao <jun...@gmail.com>
> Date:   Fri Feb 19 09:40:59 2016 -0800
> trivial fix to authorization CLI table
> commit 45c8195fa14c766b200c720f316836dbb84e9d8b
> Author: Jiangjie Qin <becket@gmail.com>
> Date:   Fri Feb 19 07:56:40 2016 -0800
> KAFKA-3025; Added timetamp to Message and use relative offset.
> commit eee95228fabe1643baa016a2d49fb0a9fe2c66bd
> Author: Yasuhiro Matsuda <yasuh...@confluent.io>
> Date:   Thu Feb 18 09:39:30 2016 +0800
> MINOR: remove streams config params from producer/consumer configs
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3256) Large number of system test failures

2016-02-20 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15155910#comment-15155910
 ] 

Anna Povzner edited comment on KAFKA-3256 at 2/21/16 6:18 AM:
--

I looked more into system tests to find where the format is expected, and there 
are several places actually:
1) Connect tests expect the output to be in JSON format. The value is published 
in JSON format, and since before the test was expecting the value only, the 
test was written to expect the console consumer output in JSON format.
2) Other tests such as reassign_partition, compatibility tests that are using 
ConsoleConsumer are setting message_validator=is_int when constructing it 
(because they were expecting only value of type integer in the console consumer 
output). This means that produce_consume_validate.py will be expecting consumer 
output to be an integer.

I actually think unless we want a system test that specifically verifies a 
timestamp, we shouldn't modify existing tests to work with a console consumer 
output containing timestamp type and timestamp. So I agree with your decision 
to not output timestamp type and timestamp by default. If we want to 
write/extend system tests that specifically checks for timestamps (type or 
validates timestamp range), then we will use an output with a timestamp.


was (Author: apovzner):
I looked more into system tests to find where the format is expected, and there 
are several places actually:
1) Connect tests expect the output to be in JSON format. The value is published 
in JSON format, and since before the test was expecting the value only, the 
test was written to expect the console consumer output in JSON format.
2) Other tests such as reassign_partition, compatibility tests that are using 
ConsoleConsumer are setting message_validator=is_int when constructing it 
(because they were expecting only value of type integer in the console consumer 
output). This means that produce_consume_validate.py will be expecting consumer 
output to be an integer.

I actually think unless we want a system test that specifically verify a 
timestamp, we shouldn't modify existing tests to work with a console consumer 
output containing timestamp type and timestamp. So I agree with your decision 
to not output timestamp type and timestamp by default. If we want to 
write/extend system tests that specifically checks for timestamps (type or 
validates timestamp range), then we will use an output with a timestamp.

> Large number of system test failures
> 
>
> Key: KAFKA-3256
> URL: https://issues.apache.org/jira/browse/KAFKA-3256
> Project: Kafka
>  Issue Type: Bug
>Reporter: Geoff Anderson
>Assignee: Jiangjie Qin
>
> Confluent's nightly run of the kafka system tests reported a large number of 
> failures beginning 2/20/2016
> Test run: 2016-02-19--001.1455897182--apache--trunk--eee9522/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-19--001.1455897182--apache--trunk--eee9522/report.html
> Pass: 136
> Fail: 0
> Test run: 2016-02-20--001.1455979842--apache--trunk--5caa800/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-20--001.1455979842--apache--trunk--5caa800/report.html
> Pass: 72
> Fail: 64
> I.e. trunk@eee9522 was the last passing run, and trunk@5caa800 had a large 
> number of failures.
> Given its complexity, the most likely culprit is 45c8195fa, and I confirmed 
> this is the first commit with failures on a small number of tests.
> [~becket_qin] do you mind investigating?
> {code}
> commit 5caa800e217c6b83f62ee3e6b5f02f56e331b309
> Author: Jun Rao <jun...@gmail.com>
> Date:   Fri Feb 19 09:40:59 2016 -0800
> trivial fix to authorization CLI table
> commit 45c8195fa14c766b200c720f316836dbb84e9d8b
> Author: Jiangjie Qin <becket@gmail.com>
> Date:   Fri Feb 19 07:56:40 2016 -0800
> KAFKA-3025; Added timetamp to Message and use relative offset.
> commit eee95228fabe1643baa016a2d49fb0a9fe2c66bd
> Author: Yasuhiro Matsuda <yasuh...@confluent.io>
> Date:   Thu Feb 18 09:39:30 2016 +0800
> MINOR: remove streams config params from producer/consumer configs
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3256) Large number of system test failures

2016-02-20 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15155910#comment-15155910
 ] 

Anna Povzner commented on KAFKA-3256:
-

I looked more into system tests to find where the format is expected, and there 
are several places actually:
1) Connect tests expect the output to be in JSON format. The value is published 
in JSON format, and since before the test was expecting the value only, the 
test was written to expect the console consumer output in JSON format.
2) Other tests such as reassign_partition, compatibility tests that are using 
ConsoleConsumer are setting message_validator=is_int when constructing it 
(because they were expecting only value of type integer in the console consumer 
output). This means that produce_consume_validate.py will be expecting consumer 
output to be an integer.

I actually think unless we want a system test that specifically verify a 
timestamp, we shouldn't modify existing tests to work with a console consumer 
output containing timestamp type and timestamp. So I agree with your decision 
to not output timestamp type and timestamp by default. If we want to 
write/extend system tests that specifically checks for timestamps (type or 
validates timestamp range), then we will use an output with a timestamp.

> Large number of system test failures
> 
>
> Key: KAFKA-3256
> URL: https://issues.apache.org/jira/browse/KAFKA-3256
> Project: Kafka
>  Issue Type: Bug
>Reporter: Geoff Anderson
>Assignee: Jiangjie Qin
>
> Confluent's nightly run of the kafka system tests reported a large number of 
> failures beginning 2/20/2016
> Test run: 2016-02-19--001.1455897182--apache--trunk--eee9522/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-19--001.1455897182--apache--trunk--eee9522/report.html
> Pass: 136
> Fail: 0
> Test run: 2016-02-20--001.1455979842--apache--trunk--5caa800/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-20--001.1455979842--apache--trunk--5caa800/report.html
> Pass: 72
> Fail: 64
> I.e. trunk@eee9522 was the last passing run, and trunk@5caa800 had a large 
> number of failures.
> Given its complexity, the most likely culprit is 45c8195fa, and I confirmed 
> this is the first commit with failures on a small number of tests.
> [~becket_qin] do you mind investigating?
> {code}
> commit 5caa800e217c6b83f62ee3e6b5f02f56e331b309
> Author: Jun Rao <jun...@gmail.com>
> Date:   Fri Feb 19 09:40:59 2016 -0800
> trivial fix to authorization CLI table
> commit 45c8195fa14c766b200c720f316836dbb84e9d8b
> Author: Jiangjie Qin <becket@gmail.com>
> Date:   Fri Feb 19 07:56:40 2016 -0800
> KAFKA-3025; Added timetamp to Message and use relative offset.
> commit eee95228fabe1643baa016a2d49fb0a9fe2c66bd
> Author: Yasuhiro Matsuda <yasuh...@confluent.io>
> Date:   Thu Feb 18 09:39:30 2016 +0800
> MINOR: remove streams config params from producer/consumer configs
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3256) Large number of system test failures

2016-02-20 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15155841#comment-15155841
 ] 

Anna Povzner edited comment on KAFKA-3256 at 2/21/16 1:40 AM:
--

[~becket_qin] The "No JSON object could be decoded" failure is also caused by 
ConsoleConsumer outputting timestamp type and timestamp. If I remove that 
output from ConsoleConsumer, I stop getting this failure. 

Also, once I did that, reassign_partitions_test also started passing.

So, it leads me to believe that there is some common test tool which expects a 
particular *format* of an output, rather than a test expecting a specific 
output.

What was the reason for outputting timestamp type and timestamp in 
ConsoleConsumer? If it does not bring much value, I propose to just got back to 
the original format of outputting key and value in ConsoleConsumer. I think  
that would fix most of the tests.  



was (Author: apovzner):
[~becket_qin] The "No JSON object could be decoded" failure is also caused by 
ConsoleConsumer outputting timestamp type and timestamp. If I remove that 
output from ConsoleConsumer, I stop getting this failure. 

Also, once I did not, reassign_partitions_test also started passing.

So, it leads me to believe that there is some common test tool which expects a 
particular *format* of an output, rather than a test expecting a specific 
output.

What was the reason for outputting timestamp type and timestamp in 
ConsoleConsumer? If it does not bring much value, I propose to just got back to 
the original format of outputting key and value in ConsoleConsumer. I think  
that would fix most of the tests.  


> Large number of system test failures
> 
>
> Key: KAFKA-3256
> URL: https://issues.apache.org/jira/browse/KAFKA-3256
> Project: Kafka
>  Issue Type: Bug
>Reporter: Geoff Anderson
>Assignee: Jiangjie Qin
>
> Confluent's nightly run of the kafka system tests reported a large number of 
> failures beginning 2/20/2016
> Test run: 2016-02-19--001.1455897182--apache--trunk--eee9522/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-19--001.1455897182--apache--trunk--eee9522/report.html
> Pass: 136
> Fail: 0
> Test run: 2016-02-20--001.1455979842--apache--trunk--5caa800/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-20--001.1455979842--apache--trunk--5caa800/report.html
> Pass: 72
> Fail: 64
> I.e. trunk@eee9522 was the last passing run, and trunk@5caa800 had a large 
> number of failures.
> Given its complexity, the most likely culprit is 45c8195fa, and I confirmed 
> this is the first commit with failures on a small number of tests.
> [~becket_qin] do you mind investigating?
> {code}
> commit 5caa800e217c6b83f62ee3e6b5f02f56e331b309
> Author: Jun Rao <jun...@gmail.com>
> Date:   Fri Feb 19 09:40:59 2016 -0800
> trivial fix to authorization CLI table
> commit 45c8195fa14c766b200c720f316836dbb84e9d8b
> Author: Jiangjie Qin <becket@gmail.com>
> Date:   Fri Feb 19 07:56:40 2016 -0800
> KAFKA-3025; Added timetamp to Message and use relative offset.
> commit eee95228fabe1643baa016a2d49fb0a9fe2c66bd
> Author: Yasuhiro Matsuda <yasuh...@confluent.io>
> Date:   Thu Feb 18 09:39:30 2016 +0800
> MINOR: remove streams config params from producer/consumer configs
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3256) Large number of system test failures

2016-02-20 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15155832#comment-15155832
 ] 

Anna Povzner commented on KAFKA-3256:
-

[~becket_qin] Most of these tests are reproducible locally, so should be easier 
to debug.

I found the issue with some of the connect tests, which produce output like 
this:
 Expected ["foo", "bar", "baz", "razz", "ma", "tazz"] but saw 
["CreateTime:1455962742782\tfoo", "CreateTime:1455962742789\tbar", 
"CreateTime:1455962742789\tbaz", "CreateTime:1455962758003\trazz", 
"CreateTime:1455962758009\tma", "CreateTime:1455962758009\ttazz"] in Kafka
Traceback (most recent call last):
  File 
"/var/lib/jenkins/workspace/kafka_system_tests/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.3.10-py2.7.egg/ducktape/tests/runner.py",
 line 102, in run_all_tests
result.data = self.run_single_test()
  File 
"/var/lib/jenkins/workspace/kafka_system_tests/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.3.10-py2.7.egg/ducktape/tests/runner.py",
 line 154, in run_single_test
return self.current_test_context.function(self.current_test)
  File 
"/var/lib/jenkins/workspace/kafka_system_tests/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.3.10-py2.7.egg/ducktape/mark/_mark.py",
 line 331, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/var/lib/jenkins/workspace/kafka_system_tests/kafka/tests/kafkatest/tests/connect_test.py",
 line 86, in test_file_source_and_sink
assert expected == actual, "Expected %s but saw %s in Kafka" % (expected, 
actual)
AssertionError: Expected ["foo", "bar", "baz", "razz", "ma", "tazz"] but saw 
["CreateTime:1455962742782\tfoo", "CreateTime:1455962742789\tbar", 
"CreateTime:1455962742789\tbaz", "CreateTime:1455962758003\trazz", 
"CreateTime:1455962758009\tma", "CreateTime:1455962758009\ttazz"] in Kafka

ConsoleConsumer was changed to also output timestamp type and timestamp value 
in addition to key/value. However, it looks connect tests expect output with 
just key and value. See test_file_source_and_sink in connect_test.py for 
example.



> Large number of system test failures
> 
>
> Key: KAFKA-3256
> URL: https://issues.apache.org/jira/browse/KAFKA-3256
> Project: Kafka
>  Issue Type: Bug
>Reporter: Geoff Anderson
>Assignee: Jiangjie Qin
>
> Confluent's nightly run of the kafka system tests reported a large number of 
> failures beginning 2/20/2016
> Test run: 2016-02-19--001.1455897182--apache--trunk--eee9522/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-19--001.1455897182--apache--trunk--eee9522/report.html
> Pass: 136
> Fail: 0
> Test run: 2016-02-20--001.1455979842--apache--trunk--5caa800/
> Link: 
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-02-20--001.1455979842--apache--trunk--5caa800/report.html
> Pass: 72
> Fail: 64
> I.e. trunk@eee9522 was the last passing run, and trunk@5caa800 had a large 
> number of failures.
> Given its complexity, the most likely culprit is 45c8195fa, and I confirmed 
> this is the first commit with failures on a small number of tests.
> [~becket_qin] do you mind investigating?
> {code}
> commit 5caa800e217c6b83f62ee3e6b5f02f56e331b309
> Author: Jun Rao <jun...@gmail.com>
> Date:   Fri Feb 19 09:40:59 2016 -0800
> trivial fix to authorization CLI table
> commit 45c8195fa14c766b200c720f316836dbb84e9d8b
> Author: Jiangjie Qin <becket@gmail.com>
> Date:   Fri Feb 19 07:56:40 2016 -0800
> KAFKA-3025; Added timetamp to Message and use relative offset.
> commit eee95228fabe1643baa016a2d49fb0a9fe2c66bd
> Author: Yasuhiro Matsuda <yasuh...@confluent.io>
> Date:   Thu Feb 18 09:39:30 2016 +0800
> MINOR: remove streams config params from producer/consumer configs
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >