Re: [DISCUSS] KIP-405 + KAFKA-7739 - Implementation of Tiered Storage Integration with Azure Storage

2021-07-12 Thread Sumant Tambe
Hi Israel,

Linkedin is interested in evaluating KIP-405 for HDFS and S3 in the short
term and Azure Blob Storage in the long run. You may already know that Linkedin
is migrating to Azure
. We think
that Blobs will provide us with the optimal cost/availability/operability
trade-offs for Kafka in Azure.
What's the context behind your interest in KIP-405 and Azure Blobs? Do you
have any data/experience of using Azure blobs at scale?

Satish, any word on the KIP-405 and the RSM implementations?

Regards,
Sumant
Kafka Dev, Linkedin

On Mon, 15 Mar 2021 at 09:08, Satish Duggana 
wrote:

> Hi Israel,
> Thanks for your interest in tiered storage. As mentioned by Jun earlier, we
> decided not to have any implementations in Apache Kafka repo like Kafka
> connectors. We plan to have RSM implementations for HDFS, S3, GCP, and
> Azure storages in a separate repo. We will let you know once they are ready
> for review.
>
> Best,
> Satish.
>
> On Sat, 13 Mar 2021 at 01:27, Israel Ekpo  wrote:
>
> > Thanks @Jun for the prompt response.
> >
> > That's ok and I think it is a great strategy just like the Connect
> > ecosystem.
> >
> > However, I am still in search for repos and samples that demonstrate
> > implementation for the KIP.
> >
> > I will keep searching but was just wondering if there were sample
> > implementations for S3 or HDFS I could take a look at.
> >
> > Thanks.
> >
> > On Fri, Mar 12, 2021 at 2:19 PM Jun Rao 
> wrote:
> >
> > > Hi, Israel,
> > >
> > > Thanks for your interest. As part of KIP-405, we have made the decision
> > not
> > > to host any plugins for external remote storage directly in Apache
> Kafka.
> > > Those plugins could be hosted outside of Apache Kafka.
> > >
> > > Jun
> > >
> > > On Thu, Mar 11, 2021 at 5:15 PM Israel Ekpo 
> > wrote:
> > >
> > > > Thanks Satish, Sriharsha, Suresh and Ying for authoring this KIP and
> > > thanks
> > > > to everyone that participated in the review and discussion to take it
> > to
> > > > where it is today.
> > > >
> > > > I would like to contribute by working on integrating Azure Storage
> > (Blob
> > > > and ADLS) with Tiered Storage for this KIP
> > > >
> > > > I have created this issue to track this work
> > > > https://issues.apache.org/jira/browse/KAFKA-12458
> > > >
> > > > Are there any sample implementations for HDFS/S3 that I can reference
> > to
> > > > get started?
> > > >
> > > > When you have a moment, please share.
> > > >
> > > > Thanks.
> > > >
> > >
> >
>


[jira] [Created] (KAFKA-6701) synchronize Log modification between delete cleanup and async delete

2018-03-21 Thread Sumant Tambe (JIRA)
Sumant Tambe created KAFKA-6701:
---

 Summary: synchronize Log modification between delete cleanup and 
async delete
 Key: KAFKA-6701
 URL: https://issues.apache.org/jira/browse/KAFKA-6701
 Project: Kafka
  Issue Type: Bug
Reporter: Sumant Tambe
Assignee: Sumant Tambe


Kafka broker crashes without any evident disk failures 

>From [~becket_qin]: This looks a bug in kafka when topic deletion and log 
>retention cleanup happen at the same time, the log retention cleanup may see 
>ClosedChannelException after the log has been renamed for async deletion.

The root cause is that the topic deletion should have set the isClosed flag of 
the partition log to true and the retention should not bother to do the old log 
segments deletion when the log is closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-5886) Implement KIP-91

2017-09-13 Thread Sumant Tambe (JIRA)
Sumant Tambe created KAFKA-5886:
---

 Summary: Implement KIP-91
 Key: KAFKA-5886
 URL: https://issues.apache.org/jira/browse/KAFKA-5886
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Reporter: Sumant Tambe
Assignee: Sumant Tambe
 Fix For: 1.0.0


Implement 
[KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [VOTE] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-09-11 Thread Sumant Tambe
@Ted, We throw a ConfigException when user-configured values of linger.ms,
request.timeout.ms, retry.backoff.ms add up to more than delivery.timeout.ms
. The kip mentions this in the Validation section.

On 11 September 2017 at 14:31, Ted Yu <yuzhih...@gmail.com> wrote:

> bq. larger than default (linger.ms + request.timeout.ms + retry.backoff.ms
> )
>
> I was not referring to the sum of default values for the above parameters.
> I was referring to the sum of user configured values for these parameters
> (since we don't know whether that sum is higher than 120 seconds or not) .
>
> On Mon, Sep 11, 2017 at 10:06 AM, Sumant Tambe <suta...@gmail.com> wrote:
>
> > @Jun, Until we make idempotent producer the default (kip-185), this kip
> is
> > sensitive to retries. I.e., we expire batches either delivery.timeout.ms
> > passes or all retries are exhausted, whichever comes first. In cases
> where
> > retries exhaust first due to linger.ms + retries * (request.timeout.ms +
> > retry.backoff.ms) being much smaller than delivery.timeout.ms, multiple
> > failed requests (due to pipelining) will cause batches to expire
> > out-of-order. Right?
> >
> > @Ted, The idea is to have the default value of delivery.timeout.ms=120
> > sec,
> > which is much larger than default (linger.ms + request.timeout.ms +
> > retry.backoff.ms). If a user configures them incorrectly, report a
> > ConfigException.
> >
> >
> > On 11 September 2017 at 09:11, Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Sumant,
> > >
> > > Thanks for the KIP. +1.
> > >
> > > Just a minor clarification. The KIP says "Batches expire in order
> > > when max.in.flight.request.per.connection==1". Is that true? It seems
> > that
> > > even with max.in.flight.request.per.connection > 1, batches should
> still
> > > expire in order.
> > >
> > > Jun
> > >
> > > On Sat, Sep 9, 2017 at 6:15 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> > >
> > > > +1 for the KIP.
> > > >
> > > > For delivery.timeout.ms , since it should be >= linger.ms +
> > > > request.timeout.ms + retry.backoff.ms , it seems the default value
> > > should
> > > > be max(120 seconds, linger.ms + request.timeout.ms +
> retry.backoff.ms
> > ).
> > > >
> > > > Cheers
> > > >
> > > > On Fri, Sep 8, 2017 at 2:04 AM, Ismael Juma <ism...@juma.me.uk>
> wrote:
> > > >
> > > > > Thanks for the KIP. +1 (binding) from me. Just a minor suggestion,
> I
> > > > would
> > > > > mention the following under "Public Interfaces":
> > > > >
> > > > > Default value of delivery.timeout.ms = 120 seconds
> > > > > Default value of retries will be changed to MAX_INT
> > > > > request.timeout.ms – current meaning, but messages are not expired
> > > after
> > > > > this time. I.e., request.timeout.ms is no longer relevant for
> batch
> > > > > expiry.
> > > > >
> > > > > The compatibility impact of such changes can remain in the
> > > compatibility
> > > > > section. Also, I agree about keeping your "reordering" text
> although
> > it
> > > > > seems like the wiki wasn't updated to match what you posted in the
> > > > > discussion thread.
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Fri, Sep 8, 2017 at 6:06 AM, Sumant Tambe <suta...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I would like to open the vote for KIP-91:
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+
> > > > > > Provide+Intuitive+User+Timeouts+in+The+Producer
> > > > > >
> > > > > > Thank you all for your input on the kip so far.
> > > > > >
> > > > > > Regards,
> > > > > > Sumant
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-09-11 Thread Sumant Tambe
@Jun, Until we make idempotent producer the default (kip-185), this kip is
sensitive to retries. I.e., we expire batches either delivery.timeout.ms
passes or all retries are exhausted, whichever comes first. In cases where
retries exhaust first due to linger.ms + retries * (request.timeout.ms +
retry.backoff.ms) being much smaller than delivery.timeout.ms, multiple
failed requests (due to pipelining) will cause batches to expire
out-of-order. Right?

@Ted, The idea is to have the default value of delivery.timeout.ms=120 sec,
which is much larger than default (linger.ms + request.timeout.ms +
retry.backoff.ms). If a user configures them incorrectly, report a
ConfigException.


On 11 September 2017 at 09:11, Jun Rao <j...@confluent.io> wrote:

> Hi, Sumant,
>
> Thanks for the KIP. +1.
>
> Just a minor clarification. The KIP says "Batches expire in order
> when max.in.flight.request.per.connection==1". Is that true? It seems that
> even with max.in.flight.request.per.connection > 1, batches should still
> expire in order.
>
> Jun
>
> On Sat, Sep 9, 2017 at 6:15 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
> > +1 for the KIP.
> >
> > For delivery.timeout.ms , since it should be >= linger.ms +
> > request.timeout.ms + retry.backoff.ms , it seems the default value
> should
> > be max(120 seconds, linger.ms + request.timeout.ms + retry.backoff.ms).
> >
> > Cheers
> >
> > On Fri, Sep 8, 2017 at 2:04 AM, Ismael Juma <ism...@juma.me.uk> wrote:
> >
> > > Thanks for the KIP. +1 (binding) from me. Just a minor suggestion, I
> > would
> > > mention the following under "Public Interfaces":
> > >
> > > Default value of delivery.timeout.ms = 120 seconds
> > > Default value of retries will be changed to MAX_INT
> > > request.timeout.ms – current meaning, but messages are not expired
> after
> > > this time. I.e., request.timeout.ms is no longer relevant for batch
> > > expiry.
> > >
> > > The compatibility impact of such changes can remain in the
> compatibility
> > > section. Also, I agree about keeping your "reordering" text although it
> > > seems like the wiki wasn't updated to match what you posted in the
> > > discussion thread.
> > >
> > > Ismael
> > >
> > > On Fri, Sep 8, 2017 at 6:06 AM, Sumant Tambe <suta...@gmail.com>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I would like to open the vote for KIP-91:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+
> > > > Provide+Intuitive+User+Timeouts+in+The+Producer
> > > >
> > > > Thank you all for your input on the kip so far.
> > > >
> > > > Regards,
> > > > Sumant
> > > >
> > >
> >
>


Re: [VOTE] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-09-08 Thread Sumant Tambe
Updated.

On 8 September 2017 at 02:04, Ismael Juma <ism...@juma.me.uk> wrote:

> Thanks for the KIP. +1 (binding) from me. Just a minor suggestion, I would
> mention the following under "Public Interfaces":
>
> Default value of delivery.timeout.ms = 120 seconds
> Default value of retries will be changed to MAX_INT
> request.timeout.ms – current meaning, but messages are not expired after
> this time. I.e., request.timeout.ms is no longer relevant for batch
> expiry.
>
> The compatibility impact of such changes can remain in the compatibility
> section. Also, I agree about keeping your "reordering" text although it
> seems like the wiki wasn't updated to match what you posted in the
> discussion thread.
>
> Ismael
>
> On Fri, Sep 8, 2017 at 6:06 AM, Sumant Tambe <suta...@gmail.com> wrote:
>
> > Hi all,
> >
> > I would like to open the vote for KIP-91:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+
> > Provide+Intuitive+User+Timeouts+in+The+Producer
> >
> > Thank you all for your input on the kip so far.
> >
> > Regards,
> > Sumant
> >
>


Re: [VOTE] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-09-07 Thread Sumant Tambe
I added the following: Due to change in the default value of retries from 0
to MAX_INT and the existing default value of
max.in.flight.request.per.connection==5, reordering becomes a possibility
by default. To prevent reordering, set
max.in.flight.request.per.connection==1.

It does not hurt to mention it as it's a default behavior change?

On 7 September 2017 at 22:15, Apurva Mehta <apu...@confluent.io> wrote:

> Thanks for the KIP Sumant, +1 from me.
>
> That is the most exhaustive 'Rejected Alternatives' section that I have
> seen :)
>
> One minor point: In the compatibility section, your note on
> 'max.in.flight.requests.per.connection == 5' resulting in out of order
> delivery is true irrespective of these changes. As such, I don't think it
> should be mentioned in the context of this KIP.
>
> Thanks,
> Apurva
>
> On Thu, Sep 7, 2017 at 10:06 PM, Sumant Tambe <suta...@gmail.com> wrote:
>
> > Hi all,
> >
> > I would like to open the vote for KIP-91:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 91+Provide+Intuitive+User+Timeouts+in+The+Producer
> >
> > Thank you all for your input on the kip so far.
> >
> > Regards,
> > Sumant
> >
>


Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-09-07 Thread Sumant Tambe
Just did :)

On 7 September 2017 at 17:52, Ismael Juma <ism...@juma.me.uk> wrote:

> Can we please start the vote on this KIP? The KIP must be accepted by next
> Wednesday in order to make the cut for 1.0.0. This issue keeps coming up
> again and again, and I'd really like to include a fix for 1.0.0.
>
> Ismael
>
> On Thu, Sep 7, 2017 at 10:01 PM, Apurva Mehta <apu...@confluent.io> wrote:
>
> > I agree with what Ismael said. Having both retries and
> delivery.timeout.ms
> > is confusing, and thus the goal is to not have a retries option at all
> once
> > idempotence is fully battle tested and has become the entrenched default.
> >
> > Until that time, it makes sense to expire batch earlier than
> > delivery.timeout.ms if retries have been exhausted.
> >
> > Thanks,
> > Apurva
> >
> >
> > On Thu, Sep 7, 2017 at 6:07 AM, Ismael Juma <ism...@juma.me.uk> wrote:
> >
> > > Good question regarding retries Sumant. A few comments:
> > >
> > > 1. Defaulting to MAX_INT makes sense in the context of
> > delivery.timeout.ms
> > > ,
> > > but introduces the possibility of reordering with the default
> > max.in.flight
> > > of 5. Personally, I think reordering is better than dropping the
> message
> > > altogether (if we keep retries=0), but it's worth noting this.
> > >
> > > 2. I agree that we should expire on whichever of retries and
> > > delivery.timeout.ms is exhausted first for 1.0.0.
> > >
> > > 3. Once KIP-185 lands (post 1.0.0), we should consider deprecating and
> > > eventually removing the retries config to simplify things (it won't
> have
> > > much use then).
> > >
> > > 4. With regards to the case where the broker replies quickly with an
> > error,
> > > we need to understand a bit more what the error is. For any kind of
> > > connection issue, we now have exponential backoff. For the case where
> an
> > > error code is returned, it depends on whether the error is retriable or
> > > not. For the former, it probably makes sense to keep retrying as it's
> > > supposed to be a transient issue. If we think it would make sense to
> > apply
> > > exponential backoff, we could also consider that. So, I'm not sure
> > retries
> > > has much use apart from compatibility and the retries=0 case (for now).
> > >
> > > Ismael
> > >
> > > On Wed, Sep 6, 2017 at 11:14 PM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Sumant,
> > > >
> > > > The diagram in the wiki seems to imply that delivery.timeout.ms
> > doesn't
> > > > include the batching time.
> > > >
> > > > For retries, probably we can just default it to MAX_INT?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Wed, Sep 6, 2017 at 10:26 AM, Sumant Tambe <suta...@gmail.com>
> > wrote:
> > > >
> > > > > 120 seconds default sounds good to me. Throwing ConfigException
> > instead
> > > > of
> > > > > WARN is fine. Added clarification that the producer waits the full
> > > > > request.timeout.ms for the in-flight request. This implies that
> user
> > > > might
> > > > > be notified of batch expiry while a batch is still in-flight.
> > > > >
> > > > > I don't recall if we discussed our point of view that existing
> > configs
> > > > like
> > > > > retries become redundant/deprecated with this feature. IMO, retries
> > > > config
> > > > > becomes meaningless due to the possibility of incorrect configs
> like
> > > > > delivery.timeout.ms > linger.ms + retries * (request..timeout.ms +
> > > > > retry.backoff.ms), retries should be basically interpreted as
> > MAX_INT?
> > > > > What
> > > > > will be the default?
> > > > >
> > > > > So do we ignore retries config or throw a ConfigException if
> > weirdness
> > > > like
> > > > > above is detected?
> > > > >
> > > > > -Sumant
> > > > >
> > > > >
> > > > > On 5 September 2017 at 17:34, Ismael Juma <ism...@juma.me.uk>
> wrote:
> > > > >
> > > > > > Thanks for updating the KIP, Sumant. A couple of points:
> > > > > >
> > > > > > 1. I

[VOTE] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-09-07 Thread Sumant Tambe
Hi all,

I would like to open the vote for KIP-91:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer

Thank you all for your input on the kip so far.

Regards,
Sumant


Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-09-06 Thread Sumant Tambe
I'm not sure whether it's a good idea to have two different ways to control
expiration. One option as you suggested is to expire batches based on
whichever happens first (exceed delivery.timeout.ms or exhaust retries).
Second option is to effectively ignore retries even if it's a very low
value. Just set it to MAX_INT regardless of user config.

On a second thought expiring after retries may make sense in cases when the
broker responds *quickly* with some exception to produce requests. From
client's point of view passage of time only (retry.backoff.ms * retries) as
opposed to much slower (request.timeout.ms + retry.backoff.ms) * retries
when a broker is down or there's a network problem. In the former case,
delivery.timeout.ms of 120 seconds may amount to 100s of retries, which
feels quite unnecessary.

-Sumant

On 6 September 2017 at 21:27, Becket Qin <becket@gmail.com> wrote:

> Hey Sumant,
>
> I agree with Jun that we can set the default value of retries to MAX_INT.
>
> Initially I was also thinking that retries can be deprecated. But after a
> second thought, I feel it may not be necessary to deprecate retries. With
> the newly added delivery.timeout.ms, the producer will expire a batch
> either when delivery.timeout.ms is hit or when retries has exhausted,
> whichever comes first. Different users may choose different flavor. It is
> introducing one more option, but seems reasonable, in some cases, users may
> want to at least retry once before expire a batch.
>
> Thanks,
>
> JIangjie (Becket) Qin
>
> On Wed, Sep 6, 2017 at 3:14 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Sumant,
> >
> > The diagram in the wiki seems to imply that delivery.timeout.ms doesn't
> > include the batching time.
> >
> > For retries, probably we can just default it to MAX_INT?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Sep 6, 2017 at 10:26 AM, Sumant Tambe <suta...@gmail.com> wrote:
> >
> > > 120 seconds default sounds good to me. Throwing ConfigException instead
> > of
> > > WARN is fine. Added clarification that the producer waits the full
> > > request.timeout.ms for the in-flight request. This implies that user
> > might
> > > be notified of batch expiry while a batch is still in-flight.
> > >
> > > I don't recall if we discussed our point of view that existing configs
> > like
> > > retries become redundant/deprecated with this feature. IMO, retries
> > config
> > > becomes meaningless due to the possibility of incorrect configs like
> > > delivery.timeout.ms > linger.ms + retries * (request..timeout.ms +
> > > retry.backoff.ms), retries should be basically interpreted as MAX_INT?
> > > What
> > > will be the default?
> > >
> > > So do we ignore retries config or throw a ConfigException if weirdness
> > like
> > > above is detected?
> > >
> > > -Sumant
> > >
> > >
> > > On 5 September 2017 at 17:34, Ismael Juma <ism...@juma.me.uk> wrote:
> > >
> > > > Thanks for updating the KIP, Sumant. A couple of points:
> > > >
> > > > 1. I think the default for delivery.timeout.ms should be higher than
> > 30
> > > > seconds given that we previously would reset the clock once the batch
> > was
> > > > sent. The value should be large enough that batches are not expired
> due
> > > to
> > > > expected events like a new leader being elected due to broker
> failure.
> > > > Would it make sense to use a conservative value like 120 seconds?
> > > >
> > > > 2. The producer currently throws an exception for configuration
> > > > combinations that don't make sense. We should probably do the same
> here
> > > for
> > > > consistency (the KIP currently proposes a log warning).
> > > >
> > > > 3. We should mention that we will not cancel in flight requests until
> > the
> > > > request timeout even though we'll expire the batch early if needed.
> > > >
> > > > I think we should start the vote tomorrow so that we have a chance of
> > > > hitting the KIP freeze for 1.0.0.
> > > >
> > > > Ismael
> > > >
> > > > On Wed, Sep 6, 2017 at 1:03 AM, Sumant Tambe <suta...@gmail.com>
> > wrote:
> > > >
> > > > > I've updated the kip-91 writeup
> > > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > 91+Provide+Intuitive+User+Timeouts+in+The+Producer>
> > > > > to 

Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-09-06 Thread Sumant Tambe
120 seconds default sounds good to me. Throwing ConfigException instead of
WARN is fine. Added clarification that the producer waits the full
request.timeout.ms for the in-flight request. This implies that user might
be notified of batch expiry while a batch is still in-flight.

I don't recall if we discussed our point of view that existing configs like
retries become redundant/deprecated with this feature. IMO, retries config
becomes meaningless due to the possibility of incorrect configs like
delivery.timeout.ms > linger.ms + retries * (request..timeout.ms +
retry.backoff.ms), retries should be basically interpreted as MAX_INT? What
will be the default?

So do we ignore retries config or throw a ConfigException if weirdness like
above is detected?

-Sumant


On 5 September 2017 at 17:34, Ismael Juma <ism...@juma.me.uk> wrote:

> Thanks for updating the KIP, Sumant. A couple of points:
>
> 1. I think the default for delivery.timeout.ms should be higher than 30
> seconds given that we previously would reset the clock once the batch was
> sent. The value should be large enough that batches are not expired due to
> expected events like a new leader being elected due to broker failure.
> Would it make sense to use a conservative value like 120 seconds?
>
> 2. The producer currently throws an exception for configuration
> combinations that don't make sense. We should probably do the same here for
> consistency (the KIP currently proposes a log warning).
>
> 3. We should mention that we will not cancel in flight requests until the
> request timeout even though we'll expire the batch early if needed.
>
> I think we should start the vote tomorrow so that we have a chance of
> hitting the KIP freeze for 1.0.0.
>
> Ismael
>
> On Wed, Sep 6, 2017 at 1:03 AM, Sumant Tambe <suta...@gmail.com> wrote:
>
> > I've updated the kip-91 writeup
> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 91+Provide+Intuitive+User+Timeouts+in+The+Producer>
> > to capture some of the discussion here. Please confirm if it's
> sufficiently
> > accurate. Feel free to edit it if you think some explanation can be
> better
> > and has been agreed upon here.
> >
> > How do you proceed from here?
> >
> > -Sumant
> >
> > On 30 August 2017 at 12:59, Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Jiangjie,
> > >
> > > I mis-understood Jason's approach earlier. It does seem to be a good
> one.
> > > We still need to calculate the selector timeout based on the remaining
> > > delivery.timeout.ms to call the callback on time, but we can always
> wait
> > > for an inflight request based on request.timeout.ms.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Aug 29, 2017 at 5:16 PM, Becket Qin <becket@gmail.com>
> > wrote:
> > >
> > > > Yeah, I think expiring a batch but still wait for the response is
> > > probably
> > > > reasonable given the result is not guaranteed anyways.
> > > >
> > > > @Jun,
> > > >
> > > > I think the frequent PID reset may still be possible if we do not
> wait
> > > for
> > > > the in-flight response to return. Consider two partitions p0 and p1,
> > the
> > > > deadline of the batches for p0 are T + 10, T + 30, T + 50... The
> > deadline
> > > > of the batches for p1 are T + 20, T + 40, T + 60... Assuming each
> > request
> > > > takes more than 10 ms to get the response. The following sequence may
> > be
> > > > possible:
> > > >
> > > > T: PID0 send batch0_p0(PID0), batch0_p1(PID0)
> > > > T + 10: PID0 expires batch0_p0(PID0), without resetting PID, sends
> > > > batch1_p0(PID0) and batch0_p1(PID0, retry)
> > > > T + 20: PID0 expires batch0_p1(PID0, retry), resets the PID to PID1,
> > > sends
> > > > batch1_p0(PID0, retry) and batch1_p1(PID1)
> > > > T + 30: PID1 expires batch1_p0(PID0, retry), without resetting PID,
> > sends
> > > > batch2_p0(PID1) and batch1_p1(PID1, retry)
> > > > T + 40: PID1 expires batch1_p1(PID1, retry), resets the PID to PID2,
> > > sends
> > > > batch2_p0(PID1, retry) and sends batch2_p1(PID2)
> > > > 
> > > >
> > > > In the above example, the producer will reset PID once every two
> > > requests.
> > > > The example did not take retry backoff into consideration, but it
> still
> > > > seems possible to encounter frequent PID reset if we do not wait for
> > the
>

Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-09-05 Thread Sumant Tambe
> > > > little
> > > > > non-intuitive to the users. Arguably it maybe OK though because
> > > currently
> > > > > when TimeoutException is thrown, there is no guarantee whether the
> > > > messages
> > > > > are delivered or not.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Tue, Aug 29, 2017 at 12:33 PM, Jason Gustafson <
> > ja...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > I think I'm with Becket. We should wait for request.timeout.ms
> for
> > > > each
> > > > > > produce request we send. We can still await the response
> internally
> > > for
> > > > > > PID/sequence maintenance even if we expire the batch from the
> > user's
> > > > > > perspective. New sequence numbers would be assigned based on the
> > > > current
> > > > > > PID until the response returns and we find whether a PID reset is
> > > > > actually
> > > > > > needed. This makes delivery.timeout.ms a hard limit which is
> > easier
> > > to
> > > > > > explain.
> > > > > >
> > > > > > -Jason
> > > > > >
> > > > > > On Tue, Aug 29, 2017 at 11:10 AM, Sumant Tambe <
> suta...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > I'm updating the kip-91 writeup. There seems to be some
> confusion
> > > > about
> > > > > > > expiring an inflight request. An inflight request gets a full
> > > > > > > delivery.timeout.ms duration from creation, right? So it
> should
> > be
> > > > > > > max(remaining delivery.timeout.ms, request.timeout.ms)?
> > > > > > >
> > > > > > > Jun, we do want to wait for an inflight request for longer than
> > > > > > > request.timeout.ms. right?
> > > > > > >
> > > > > > > What happens to a batch when retries * (request.timeout.ms +
> > > > > > > retry.backoff.ms) < delivery.timeout.ms  and all retries are
> > > > > > exhausted?  I
> > > > > > > remember an internal discussion where we concluded that retries
> > can
> > > > be
> > > > > no
> > > > > > > longer relevant (i.e., ignored, which is same as
> > retries=MAX_LONG)
> > > > when
> > > > > > > there's an end-to-end delivery.timeout.ms. Do you agree?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Sumant
> > > > > > >
> > > > > > > On 27 August 2017 at 12:08, Jun Rao <j...@confluent.io> wrote:
> > > > > > >
> > > > > > > > Hi, Jiangjie,
> > > > > > > >
> > > > > > > > If we want to enforce delivery.timeout.ms, we need to take
> the
> > > min
> > > > > > > right?
> > > > > > > > Also, if a user sets a large delivery.timeout.ms, we
> probably
> > > > don't
> > > > > > want
> > > > > > > > to
> > > > > > > > wait for an inflight request longer than request.timeout.ms.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > > On Fri, Aug 25, 2017 at 5:19 PM, Becket Qin <
> > > becket@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jason,
> > > > > > > > >
> > > > > > > > > I see what you mean. That makes sense. So in the above case
> > > after
> > > > > the
> > > > > > > > > producer resets PID, when it retry batch_0_tp1, the batch
> > will
> > > > > still
> > > > > > > have
> > > > > > > > > the old PID even if the producer has already got a new PID.
> > > > > > > > >
> > > > > > > > > @Jun, do you mean max(remaining delivery.timeout.ms,
> > > > > 

Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-08-29 Thread Sumant Tambe
IRA to try and
> improve
> > > > this.
> > > > >
> > > > > So if we expire the batch prematurely and resend all
> > > > > > the other batches in the same request, chances are there will be
> > > > > > duplicates. If we wait for the response instead, it is less
> likely
> > to
> > > > > > introduce duplicates, and we may not need to reset the PID.
> > > > >
> > > > >
> > > > > Not sure I follow this. Are you assuming that we change the batch
> > > > > PID/sequence of the retried batches after resetting the PID? I
> think
> > we
> > > > > probably need to ensure that when we retry a batch, we always use
> the
> > > > same
> > > > > PID/sequence.
> > > > >
> > > > > By the way, as far as naming, `max.message.delivery.wait.ms` is
> > quite
> > > a
> > > > > mouthful. Could we shorten it? Perhaps `delivery.timeout.ms`?
> > > > >
> > > > > -Jason
> > > > >
> > > > > On Wed, Aug 23, 2017 at 8:51 PM, Becket Qin <becket@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > If TCP timeout is longer than request.timeout.ms, the producer
> > will
> > > > > always
> > > > > > hit request.timeout.ms before hitting TCP timeout, right? That
> is
> > > why
> > > > we
> > > > > > added request.timeout.ms in the first place.
> > > > > >
> > > > > > You are right. Currently we are reset the PID and resend the
> > batches
> > > to
> > > > > > avoid OutOfOrderSequenceException when the expired batches are in
> > > > retry.
> > > > > >
> > > > > > This does not distinguish the reasons that caused the retry.
> There
> > > are
> > > > > two
> > > > > > cases:
> > > > > > 1. If the batch was in retry because it received an error
> response
> > > > (e.g.
> > > > > > NotLeaderForPartition), we actually don't need to reset PID in
> this
> > > > case
> > > > > > because we know that broker did not accept it.
> > > > > > 2. If the batch was in retry because it hit a timeout earlier,
> then
> > > we
> > > > > > should reset the PID (or optimistically send and only reset PID
> > when
> > > > > > receive OutOfOrderSequenceException?)
> > > > > > Case 1 is probably the most common case, so it looks that we are
> > > > > resetting
> > > > > > the PID more often than necessary. But because in case 1 the
> broker
> > > > does
> > > > > > not have the batch, there isn't much impact on resting PID and
> > resend
> > > > > other
> > > > > > than the additional round trip.
> > > > > >
> > > > > > Now we are introducing another case:
> > > > > > 3. A batch is in retry because we expired an in-flight request
> > before
> > > > it
> > > > > > hits request.timeout.ms.
> > > > > >
> > > > > > The difference between 2 and 3 is that in case 3 likely the
> broker
> > > has
> > > > > > appended the messages. So if we expire the batch prematurely and
> > > resend
> > > > > all
> > > > > > the other batches in the same request, chances are there will be
> > > > > > duplicates. If we wait for the response instead, it is less
> likely
> > to
> > > > > > introduce duplicates, and we may not need to reset the PID.
> > > > > >
> > > > > > That said, given that batch expiration is probably already rare
> > > enough,
> > > > > so
> > > > > > it may not be necessary to optimize for that.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 23, 2017 at 5:01 PM, Jun Rao <j...@confluent.io>
> wrote:
> > > > > >
> > > > > > > Hi, Becket,
> > > > > > >
> > > > > > > If a message expires 

Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-08-23 Thread Sumant Tambe
; the
> >>> batch is sendable (i.e., broker is available, inflight request limit is
> >>> not
> >>> exceeded, etc). That way, the producer has more chance for batching.
> The
> >>> implication is that a batch could be closed longer than linger.ms.
> >>>
> >>> Now, on your concern about not having a precise way to control delay in
> >>> the
> >>> accumulator. It seems the batch.expiry.ms approach will have the same
> >>> issue. If you start the clock when a batch is initialized, you may
> expire
> >>> some messages in the same batch early than batch.expiry.ms. If you
> start
> >>> the clock when the batch is closed, the expiration time could be
> >>> unbounded
> >>> because of the linger.ms implementation described above. Starting the
> >>> expiration clock on batch initialization will at least guarantee the
> time
> >>> to expire the first message is precise, which is probably good enough.
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>>
> >>>
> >>> On Tue, Aug 15, 2017 at 3:46 PM, Sumant Tambe <suta...@gmail.com>
> wrote:
> >>>
> >>> > Question about "the closing of a batch can be delayed longer than
> >>> > linger.ms":
> >>> > Is it possible to cause an indefinite delay? At some point bytes
> limit
> >>> > might kick in. Also, why is closing of a batch coupled with
> >>> availability of
> >>> > its destination? In this approach a batch chosen for eviction due to
> >>> delay
> >>> > needs to "close" anyway, right (without regards to destination
> >>> > availability)?
> >>> >
> >>> > I'm not too worried about notifying at super-exact time specified in
> >>> the
> >>> > configs. But expiring before the full wait-span has elapsed sounds a
> >>> little
> >>> > weird. So expiration time has a +/- spread. It works more like a hint
> >>> than
> >>> > max. So why not message.delivery.wait.hint.ms?
> >>> >
> >>> > Yeah, cancellable future will be similar in complexity.
> >>> >
> >>> > I'm unsure if max.message.delivery.wait.ms will the final nail for
> >>> > producer
> >>> > timeouts. We still won't have a precise way to control delay in just
> >>> the
> >>> > accumulator segment. batch.expiry.ms does not try to abstract. It's
> >>> very
> >>> > specific.
> >>> >
> >>> > My biggest concern at the moment is implementation complexity.
> >>> >
> >>> > At this state, I would like to encourage other independent opinions.
> >>> >
> >>> > Regards,
> >>> > Sumant
> >>> >
> >>> > On 11 August 2017 at 17:35, Jun Rao <j...@confluent.io> wrote:
> >>> >
> >>> > > Hi, Sumant,
> >>> > >
> >>> > > 1. Yes, it's probably reasonable to require
> >>> max.message.delivery.wait.ms
> >>> > >
> >>> > > linger.ms. As for retries, perhaps we can set the default retries
> to
> >>> > > infinite or just ignore it. Then the latency will be bounded by
> >>> > > max.message.delivery.wait.ms. request.timeout.ms is the max time
> the
> >>> > > request will be spending on the server. The client can expire an
> >>> inflight
> >>> > > request early if needed.
> >>> > >
> >>> > > 2. Well, since max.message.delivery.wait.ms specifies the max,
> >>> calling
> >>> > the
> >>> > > callback a bit early may be ok? Note that
> >>> max.message.delivery.wait.ms
> >>> > > only
> >>> > > comes into play in the rare error case. So, I am not sure if we
> need
> >>> to
> >>> > be
> >>> > > very precise. The issue with starting the clock on closing a batch
> is
> >>> > that
> >>> > > currently if the leader is not available, the closing of a batch
> can
> >>> be
> >>> > > delayed longer than linger.ms.
> >>> > >
> >>> > > 4. As you said, future.get(timeout) itself doesn't solve the
> problem
> >>>

Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-08-15 Thread Sumant Tambe
Question about "the closing of a batch can be delayed longer than linger.ms":
Is it possible to cause an indefinite delay? At some point bytes limit
might kick in. Also, why is closing of a batch coupled with availability of
its destination? In this approach a batch chosen for eviction due to delay
needs to "close" anyway, right (without regards to destination
availability)?

I'm not too worried about notifying at super-exact time specified in the
configs. But expiring before the full wait-span has elapsed sounds a little
weird. So expiration time has a +/- spread. It works more like a hint than
max. So why not message.delivery.wait.hint.ms?

Yeah, cancellable future will be similar in complexity.

I'm unsure if max.message.delivery.wait.ms will the final nail for producer
timeouts. We still won't have a precise way to control delay in just the
accumulator segment. batch.expiry.ms does not try to abstract. It's very
specific.

My biggest concern at the moment is implementation complexity.

At this state, I would like to encourage other independent opinions.

Regards,
Sumant

On 11 August 2017 at 17:35, Jun Rao <j...@confluent.io> wrote:

> Hi, Sumant,
>
> 1. Yes, it's probably reasonable to require max.message.delivery.wait.ms >
> linger.ms. As for retries, perhaps we can set the default retries to
> infinite or just ignore it. Then the latency will be bounded by
> max.message.delivery.wait.ms. request.timeout.ms is the max time the
> request will be spending on the server. The client can expire an inflight
> request early if needed.
>
> 2. Well, since max.message.delivery.wait.ms specifies the max, calling the
> callback a bit early may be ok? Note that max.message.delivery.wait.ms
> only
> comes into play in the rare error case. So, I am not sure if we need to be
> very precise. The issue with starting the clock on closing a batch is that
> currently if the leader is not available, the closing of a batch can be
> delayed longer than linger.ms.
>
> 4. As you said, future.get(timeout) itself doesn't solve the problem since
> you still need a way to expire the record in the sender. The amount of work
> to implement a cancellable future is probably the same?
>
> Overall, my concern with patch work is that we have iterated on the produce
> request timeout multiple times and new issues keep coming back. Ideally,
> this time, we want to have a solution that covers all cases, even though
> that requires a bit more work.
>
> Thanks,
>
> Jun
>
>
> On Fri, Aug 11, 2017 at 12:30 PM, Sumant Tambe <suta...@gmail.com> wrote:
>
> > Hi Jun,
> >
> > Thanks for looking into it.
> >
> > Yes, we did consider this message-level timeout approach and expiring
> > batches selectively in a request but rejected it due to the reasons of
> > added complexity without a strong benefit to counter-weigh that. Your
> > proposal is a slight variation so I'll mention some issues here.
> >
> > 1. It sounds like max.message.delivery.wait.ms will overlap with "time
> > segments" of both linger.ms and retries * (request.timeout.ms +
> > retry.backoff.ms). In that case, which config set takes precedence? It
> > would not make sense to configure configs from both sets. Especially, we
> > discussed exhaustively internally that retries and
> > max.message.delivery.wait.ms can't / shouldn't be configured together.
> > Retires become moot as you already mention. I think that's going to be
> > surprising to anyone wanting to use max.message.delivery.wait.ms. We
> > probably need max.message.delivery.wait.ms > linger.ms or something like
> > that.
> >
> > 2. If clock starts when a batch is created and expire when
> > max.message.delivery.wait.ms is over in the accumulator, the last few
> > messages in the expiring batch may not have lived long enough. As the
> > config seems to suggests per-message timeout, it's incorrect to expire
> > messages prematurely. On the other hand if clock starts after batch is
> > closed (which also implies that linger.ms is not covered by the
> > max.message.delivery.wait.ms config), no message would be be expired too
> > soon. Yeah, expiration may be little bit too late but hey, this ain't
> > real-time service.
> >
> > 3. I agree that steps #3, #4, (and #5) are complex to implement. On the
> > other hand, batch.expiry.ms is next to trivial to implement. We just
> pass
> > the config all the way down to ProducerBatch.maybeExpire and be done with
> > it.
> >
> > 4. Do you think the effect of max.message.delivery.wait.ms can be
> > simulated
> > with future.get(timeout) method? Copying excerpt from the kip-91: An
> > end-to-end timeout may b

Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-08-11 Thread Sumant Tambe
Hi Jun,

Thanks for looking into it.

Yes, we did consider this message-level timeout approach and expiring
batches selectively in a request but rejected it due to the reasons of
added complexity without a strong benefit to counter-weigh that. Your
proposal is a slight variation so I'll mention some issues here.

1. It sounds like max.message.delivery.wait.ms will overlap with "time
segments" of both linger.ms and retries * (request.timeout.ms +
retry.backoff.ms). In that case, which config set takes precedence? It
would not make sense to configure configs from both sets. Especially, we
discussed exhaustively internally that retries and
max.message.delivery.wait.ms can't / shouldn't be configured together.
Retires become moot as you already mention. I think that's going to be
surprising to anyone wanting to use max.message.delivery.wait.ms. We
probably need max.message.delivery.wait.ms > linger.ms or something like
that.

2. If clock starts when a batch is created and expire when
max.message.delivery.wait.ms is over in the accumulator, the last few
messages in the expiring batch may not have lived long enough. As the
config seems to suggests per-message timeout, it's incorrect to expire
messages prematurely. On the other hand if clock starts after batch is
closed (which also implies that linger.ms is not covered by the
max.message.delivery.wait.ms config), no message would be be expired too
soon. Yeah, expiration may be little bit too late but hey, this ain't
real-time service.

3. I agree that steps #3, #4, (and #5) are complex to implement. On the
other hand, batch.expiry.ms is next to trivial to implement. We just pass
the config all the way down to ProducerBatch.maybeExpire and be done with
it.

4. Do you think the effect of max.message.delivery.wait.ms can be simulated
with future.get(timeout) method? Copying excerpt from the kip-91: An
end-to-end timeout may be partially emulated using the future.get(timeout).
The timeout must be greater than (batch.expiry.ms + nRetries * (
request.timeout.ms + retry.backoff.ms)). Note that when future times out,
Sender may continue to send the records in the background. To avoid that,
implementing a cancellable future is a possibility.

For simplicity, we could just implement a trivial method in producer
ProducerConfigs.maxMessageDeliveryWaitMs() and return a number based on
this formula? Users of future.get can use this timeout value.

Thoughts?

Regards,
Sumant



On 11 August 2017 at 07:50, Sumant Tambe <suta...@gmail.com> wrote:

>
> Thanks for the KIP. Nice documentation on all current issues with the
>> timeout.
>
> For the KIP writeup, all credit goes to Joel Koshy.
>
> I'll follow up on your comments a little later.
>
>
>>
>> You also brought up a good use case for timing out a message. For
>> applications that collect and send sensor data to Kafka, if the data can't
>> be sent to Kafka for some reason, the application may prefer to buffer the
>> more recent data in the accumulator. Without a timeout, the accumulator
>> will be filled with old records and new records can't be added.
>>
>> Your proposal makes sense for a developer who is familiar with how the
>> producer works. I am not sure if this is very intuitive to the users since
>> it may not be very easy for them to figure out how to configure the new
>> knob to bound the amount of the time when a message is completed.
>>
>> From users' perspective, Apurva's suggestion of
>> max.message.delivery.wait.ms (which
>> bounds the time when a message is in the accumulator to the time when the
>> callback is called) seems more intuition. You listed this in the rejected
>> section since it requires additional logic to rebatch when a produce
>> request expires. However, this may not be too bad. The following are the
>> things that we have to do.
>>
>> 1. The clock starts when a batch is created.
>> 2. If the batch can't be drained within max.message.delivery.wait.ms, all
>> messages in the batch will fail and the callback will be called.
>> 3. When sending a produce request, we calculate an expireTime for the
>> request that equals to the remaining expiration time for the oldest batch
>> in the request.
>> 4. We set the minimum of the expireTime of all inflight requests as the
>> timeout in the selector poll call (so that the selector can wake up before
>> the expiration time).
>> 5. If the produce response can't be received within expireTime, we expire
>> all batches in the produce request whose expiration time has been reached.
>> For the rest of the batches, we resend them in a new produce request.
>> 6. If the producer response has a retriable error, we just backoff a bit
>> and then retry the produce request as today. The number of retries doesn't
>> 

Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-08-11 Thread Sumant Tambe
> Thanks for the KIP. Nice documentation on all current issues with the
> timeout.

For the KIP writeup, all credit goes to Joel Koshy.

I'll follow up on your comments a little later.


>
> You also brought up a good use case for timing out a message. For
> applications that collect and send sensor data to Kafka, if the data can't
> be sent to Kafka for some reason, the application may prefer to buffer the
> more recent data in the accumulator. Without a timeout, the accumulator
> will be filled with old records and new records can't be added.
>
> Your proposal makes sense for a developer who is familiar with how the
> producer works. I am not sure if this is very intuitive to the users since
> it may not be very easy for them to figure out how to configure the new
> knob to bound the amount of the time when a message is completed.
>
> From users' perspective, Apurva's suggestion of
> max.message.delivery.wait.ms (which
> bounds the time when a message is in the accumulator to the time when the
> callback is called) seems more intuition. You listed this in the rejected
> section since it requires additional logic to rebatch when a produce
> request expires. However, this may not be too bad. The following are the
> things that we have to do.
>
> 1. The clock starts when a batch is created.
> 2. If the batch can't be drained within max.message.delivery.wait.ms, all
> messages in the batch will fail and the callback will be called.
> 3. When sending a produce request, we calculate an expireTime for the
> request that equals to the remaining expiration time for the oldest batch
> in the request.
> 4. We set the minimum of the expireTime of all inflight requests as the
> timeout in the selector poll call (so that the selector can wake up before
> the expiration time).
> 5. If the produce response can't be received within expireTime, we expire
> all batches in the produce request whose expiration time has been reached.
> For the rest of the batches, we resend them in a new produce request.
> 6. If the producer response has a retriable error, we just backoff a bit
> and then retry the produce request as today. The number of retries doesn't
> really matter now. We just keep retrying until the expiration time is
> reached. It's possible that a produce request is never retried due to
> expiration. However, this seems the right thing to do since the users want
> to timeout the message at this time.
>
> Implementation wise, there will be a bit more complexity in step 3 and 4,
> but probably not too bad. The benefit is that this is more intuitive to the
> end user.
>
> Does that sound reasonable to you?
>
> Thanks,
>
> Jun
>
>
> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe <suta...@gmail.com> wrote:
>
> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta <apu...@confluent.io> wrote:
> >
> > > > > There seems to be no relationship with cluster metadata
> availability
> > or
> > > > > staleness. Expiry is just based on the time since the batch has
> been
> > > > ready.
> > > > > Please correct me if I am wrong.
> > > > >
> > > >
> > > > I was not very specific about where we do expiration. I glossed over
> > some
> > > > details because (again) we've other mechanisms to detect non
> progress.
> > > The
> > > > condition (!muted.contains(tp) && (isMetadataStale ||
> > > > > cluster.leaderFor(tp) == null)) is used in
> > > > RecordAccumualtor.expiredBatches:
> > > > https://github.com/apache/kafka/blob/trunk/clients/src/
> > > > main/java/org/apache/kafka/clients/producer/internals/
> > > > RecordAccumulator.java#L443
> > > >
> > > >
> > > > Effectively, we expire in all the following cases
> > > > 1) producer is partitioned from the brokers. When metadata age grows
> > > beyond
> > > > 3x it's max value. It's safe to say that we're not talking to the
> > brokers
> > > > at all. Report.
> > > > 2) fresh metadata && leader for a partition is not known && a batch
> is
> > > > sitting there for longer than request.timeout.ms. This is one case
> we
> > > > would
> > > > like to improve and use batch.expiry.ms because request.timeout.ms
> is
> > > too
> > > > small.
> > > > 3) fresh metadata && leader for a partition is known && batch is
> > sitting
> > > > there for longer than batch.expiry.ms. This is a new case that is
> > > > different
> > > > from #2. This is the catch-up mo

Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-08-09 Thread Sumant Tambe
On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta  wrote:

> > > There seems to be no relationship with cluster metadata availability or
> > > staleness. Expiry is just based on the time since the batch has been
> > ready.
> > > Please correct me if I am wrong.
> > >
> >
> > I was not very specific about where we do expiration. I glossed over some
> > details because (again) we've other mechanisms to detect non progress.
> The
> > condition (!muted.contains(tp) && (isMetadataStale ||
> > > cluster.leaderFor(tp) == null)) is used in
> > RecordAccumualtor.expiredBatches:
> > https://github.com/apache/kafka/blob/trunk/clients/src/
> > main/java/org/apache/kafka/clients/producer/internals/
> > RecordAccumulator.java#L443
> >
> >
> > Effectively, we expire in all the following cases
> > 1) producer is partitioned from the brokers. When metadata age grows
> beyond
> > 3x it's max value. It's safe to say that we're not talking to the brokers
> > at all. Report.
> > 2) fresh metadata && leader for a partition is not known && a batch is
> > sitting there for longer than request.timeout.ms. This is one case we
> > would
> > like to improve and use batch.expiry.ms because request.timeout.ms is
> too
> > small.
> > 3) fresh metadata && leader for a partition is known && batch is sitting
> > there for longer than batch.expiry.ms. This is a new case that is
> > different
> > from #2. This is the catch-up mode case. Things are moving too slowly.
> > Pipeline SLAs are broken. Report and shutdown kmm.
> >
> > The second and the third cases are useful to a real-time app for a
> > completely different reason. Report, forget about the batch, and just
> move
> > on (without shutting down).
> >
> >
> If I understand correctly, you are talking about a fork of apache kafka
> which has these additional conditions? Because that check doesn't exist on
> trunk today.

Right. It is our internal release in LinkedIn.

Or are you proposing to change the behavior of expiry to
> account for stale metadata and partitioned producers as part of this KIP?


No. It's our temporary solution in the absence of kip-91. Note that we dont
like increasing request.timeout.ms. Without our extra conditions our
batches expire too soon--a problem in kmm catchup mode.

If we get batch.expiry.ms, we will configure it to 20 mins. maybeExpire
will use the config instead of r.t.ms. The extra conditions will be
unnecessary. All three cases shall be covered via the batch.expiry timeout.

>
>


Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-08-09 Thread Sumant Tambe
Responses inline.

> > However, one thing which has not come out of the JIRA discussion is the
> > > actual use cases for batch expiry.
> >
> > There are two usecases I can think of for batch expiry mechanism
> > irrespective of how we try to bound the time (batch.expiry.ms or
> > max.message.delivery.wait.ms). Let's call it X.
> >
> > 1. A real-time app (e.g., periodic healthcheck producer, temperature
> sensor
> > producer) has a soft upper bound on both message delivery and failure
> > notification of message delivery. In both cases, it wants to know. Such
> an
> > app does not close the producer on the first error reported (due to batch
> > expiry) because there's data lined up right behind. It's ok to lose a few
> > samples of temperature measurement (IoT scenario). So it simply drops it
> > and moves on. May be when drop rate is like 70% it would close it. Such
> an
> > app may use acks=0. In this case, X will have some value in single digit
> > minutes. But X=MAX_LONG is not suitable.
> >
>
> I guess my question is: batches would only start expiring if partition(s)
> are unavailable. What would 'moving on' mean for a real time app in this
> case? How would that help?
>
Partition unavailability is one of the cases. The other one is kmm "catch
up" mode where producer's outbound throughput is lower than consumer's
inbound tput. This can happen when kmm consumers consume locally but the
producer produces remotely. In such a case, the pipeline latency SLA starts
to break. Batches remain in the accumulator for too long. But there's no
unavailable partition. Because it's kmm, it can't just give up on the
batches. It will shut itself down and alerts will fire.

By "moving on", I mean fire and forget. For a real-time app, in both cases
(a partition is unavailable, or simply just outbound tput is low), it's
important to deliver newer data rather than trying hard to delivery old,
stale data.

>> After expiring one set of batches, the next set
>> would also be expired until the cluster comes back. So what is achieved
by
>> expiring the batches at all?
Yes, expiring is important. Even next series of batches should be expired
if they have crossed batch.expiry.ms. That means, the data is stale and
it's a hint to the middleware that don't bother with this stale data. Try
to use the resources on newer data further back in the queue. An example of
such an app would be healthcheck sensor thread that publishes
application-specific metrics periodically. Correct answer too late is a
wrong answer.


>
> >
> > 2. Today we run KMM in Linkedin as if batch.expiry==MAX_LONG. We expire
> > under the condition: (!muted.contains(tp) && (isMetadataStale ||
> > cluster.leaderFor(tp) == null)) In essence, as long as the partition is
> > making progress (even if it's a trickle), the producer keeps on going.
> > We've other internal systems to detect whether a pipeline is making
> > *sufficient* progress or not. We're not dependent on the producer to tell
> > us that it's not making progress on a certain partition.
> >
> > This is less than ideal though. We would be happy to configure
> > batch.expiry.ms to 1800,000 or so and upon notification of expiry
> restart
> > the process and what not. It can tell also tell us which specific
> > partitions of a specific topic is falling behind. We achieve a similar
> > effect via alternative mechanisms.
>
>
> I presume you are referring to partitions here? If a some partitions are
> unavailable, what could mirror maker do with that knowledge? Presumably
> focus on the partitions which are online. But does that really help?  Today
> the expiry message already contains the partition information, is that
> being used? If not, how would accurate expiry times with something like
> batch.expiry.ms change that fact?
>
You are right, KMM can't do much with the partition information.  My point
is that if we assume for a moment that Producer is the only piece that can
tell us whether a kmm pipeline is making progress or not. Would you
configure it to wait forever (batch.expiry=max_long) or would it be some
small value? It's the later we prefer. However, we really don't need
producer to tell us because we've other infrastructure pieces to tell us
about non-progress.

>
>
> >
> > > Also, the KIP document states the
> > > following:
> > >
> > > *The per message timeout is easy to compute - linger.ms
> > > >  + (retries + 1) * request.timeout.ms
> > > > ". *This is false.
> > >
> >
> > > Why is the statement false? Doesn't that provide an accurate upperbound
> > on
> > > the timeout for a produce request today?
> > >
> > The KIP-91 write-up describes the reasons why. Just reiterating the
> reason:
> > "the condition that if the metadata for a partition is known then we do
> not
> > expire its batches even if they are ready".  Do you not agree with the
> > explanation? If not, what part?
> >
> >
> Unless I have missed something, the producer doesn't seem to rely 

Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-08-07 Thread Sumant Tambe
Please see the replies inline.

If we are going to have a separate configuration for expiry, I prefer my
> proposal of max.message.delivery.wait.ms and its semantics.
>
OK. I hope others will voice their preference too.


>
> However, one thing which has not come out of the JIRA discussion is the
> actual use cases for batch expiry.

There are two usecases I can think of for batch expiry mechanism
irrespective of how we try to bound the time (batch.expiry.ms or
max.message.delivery.wait.ms). Let's call it X.

1. A real-time app (e.g., periodic healthcheck producer, temperature sensor
producer) has a soft upper bound on both message delivery and failure
notification of message delivery. In both cases, it wants to know. Such an
app does not close the producer on the first error reported (due to batch
expiry) because there's data lined up right behind. It's ok to lose a few
samples of temperature measurement (IoT scenario). So it simply drops it
and moves on. May be when drop rate is like 70% it would close it. Such an
app may use acks=0. In this case, X will have some value in single digit
minutes. But X=MAX_LONG is not suitable.

2. Today we run KMM in Linkedin as if batch.expiry==MAX_LONG. We expire
under the condition: (!muted.contains(tp) && (isMetadataStale ||
cluster.leaderFor(tp) == null)) In essence, as long as the partition is
making progress (even if it's a trickle), the producer keeps on going.
We've other internal systems to detect whether a pipeline is making
*sufficient* progress or not. We're not dependent on the producer to tell
us that it's not making progress on a certain partition.

This is less than ideal though. We would be happy to configure
batch.expiry.ms to 1800,000 or so and upon notification of expiry restart
the process and what not. It can tell also tell us which specific
partitions of a specific topic is falling behind. We achieve a similar
effect via alternative mechanisms.

In the absence of a general out-of-band mechanism for discovering slowness
(or nonprogress), KIP-91 is an attempt to allow the producer to report
non-progress without using request.timeout.ms. Hence batch.expiry.ms.


> Also, the KIP document states the
> following:
>
> *The per message timeout is easy to compute - linger.ms
> > <http://linger.ms/> + (retries + 1) * request.timeout.ms
> > <http://request.timeout.ms/>". *This is false.
>

> Why is the statement false? Doesn't that provide an accurate upperbound on
> the timeout for a produce request today?
>
The KIP-91 write-up describes the reasons why. Just reiterating the reason:
"the condition that if the metadata for a partition is known then we do not
expire its batches even if they are ready".  Do you not agree with the
explanation? If not, what part?

>
> Another point: the kip document keeps mentioning that the current timeouts
> are not intuitive, but for whom? In general, batch expiry as a notion is
> not intuitive and I am not sure the new settings change that fact.
>
Yeah, that's subjective.


>
> In this spirit, it might make sense to clarify the use case that motivates
> this additional setting. For instance, with this new configuration, how
> would your existing application handle a batch expired exception?

Again, a real-time app would just move on. KMM would halt. Any
order-sensitive app which needs to provide durability guarantees would
halt.


> How is it
> different from the way it handles the exception today?

It's not about how the existing applications will change their behavior.
It's about controlling *when*.


> Is the expiry
> exception a proxy for another piece of information like 'partition X is
> unavailable'?
>
Intriguing thought. If BatchExpiredException extends TimeoutException and
includes some context, such as TopicPartition, broker-id, an app may
provide differentiated service based on a topic name or availability zone
of a broker-id. KIP-91 does not propose anything like that. It's a very
niche usecase though.

Regards,
Sumant

>
>
>
>
>
> On Thu, Aug 3, 2017 at 4:35 PM, Sumant Tambe <suta...@gmail.com> wrote:
>
> > I don't want to list the alternatives in the JIRA as rejected just yet
> > because they are still being discussed. I would encourage the respective
> > proposers to do that. It's a wiki after all.
> >
> > As per my current understanding, there are two alternatives being
> proposed.
> > The original kip-91 approach #1 and #2 from Apurva. Apurva, correct me if
> > I'm wrong.
> >
> > #1. The batch.expiry.ms proposal: In this proposal the config is meant
> to
> > control ONLY the accumulator timeout. See the second diagram in kip-91.
> The
> > question "would the the clock for batch expiry be reset every time the
> > batch is requeued after failure?"

Re: [DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-08-03 Thread Sumant Tambe
I don't want to list the alternatives in the JIRA as rejected just yet
because they are still being discussed. I would encourage the respective
proposers to do that. It's a wiki after all.

As per my current understanding, there are two alternatives being proposed.
The original kip-91 approach #1 and #2 from Apurva. Apurva, correct me if
I'm wrong.

#1. The batch.expiry.ms proposal: In this proposal the config is meant to
control ONLY the accumulator timeout. See the second diagram in kip-91. The
question "would the the clock for batch expiry be reset every time the
batch is requeued after failure?" does not arise here. There's no automatic
reenque. An application calls send again if it needs to in response to an
expired batch notification.

#2. The max.message.delivery.wait.ms proposal: From Apurva's comment:
"...  if `T + max.message.delivery.wait.ms` has elapsed and the message has
still not been successfully acknowledged..." This seems to suggest that the
config is meant to span time in the accumulator AND time spent during
network-level retries (if any). KIP-91 calls this approach "end-to-end
timeout model" and includes it as rejected for the reasons explained.

There are small variations proposed further back in the JIRA discussion.
I'll let the respective proposers decide whether those options are relevant
at this point.

-Sumant

On 3 August 2017 at 15:26, Jason Gustafson <ja...@confluent.io> wrote:

> Thanks for the KIP. Just a quick comment. Can you list the alternatives
> mentioned in the JIRA discussion in the rejected alternatives section?
>
> -Jason
>
> On Thu, Aug 3, 2017 at 3:09 PM, Sumant Tambe <suta...@gmail.com> wrote:
>
> > Hi all,
> >
> > KIP-91 [1] is another attempt to get better control on producer side
> > timeouts. In essence we're proposing a new config named batch.expiry.ms
> >  that will cause batches in the accumulator to expire after the
> configured
> > timeout.
> >
> > Recently, the discussion on KAFKA-5621 [2] has shed new light on the
> > proposal and some alternatives.
> >
> > Please share your thoughts here on the mailing list.
> >
> > Regards,
> > Sumant
> >
> > [1]
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 91+Provide+Intuitive+User+Timeouts+in+The+Producer
> > [2] https://issues.apache.org/jira/browse/KAFKA-5621
> >
>


[DISCUSS] KIP-91 Provide Intuitive User Timeouts in The Producer

2017-08-03 Thread Sumant Tambe
Hi all,

KIP-91 [1] is another attempt to get better control on producer side
timeouts. In essence we're proposing a new config named batch.expiry.ms
 that will cause batches in the accumulator to expire after the configured
timeout.

Recently, the discussion on KAFKA-5621 [2] has shed new light on the
proposal and some alternatives.

Please share your thoughts here on the mailing list.

Regards,
Sumant

[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer
[2] https://issues.apache.org/jira/browse/KAFKA-5621


[jira] [Created] (KAFKA-4395) KafkaConfig and LogConfig should not have static initialization order dependencies

2016-11-09 Thread Sumant Tambe (JIRA)
Sumant Tambe created KAFKA-4395:
---

 Summary: KafkaConfig and LogConfig should not have static 
initialization order dependencies
 Key: KAFKA-4395
 URL: https://issues.apache.org/jira/browse/KAFKA-4395
 Project: Kafka
  Issue Type: Bug
Reporter: Sumant Tambe
Assignee: Sumant Tambe


LogConfig.configDef.serverDefaultConfigNames is not initialized properly in due 
to static initialization order dependencies between KafkaConfig and LogConfig. 
The map ends inserting null values, which are all string literals. Consider the 
following.

1. KafkaConfig begins initialization at first because KafkaServer constructor 
needs KafkaConfig. 
2. at KafkaConfig.LogMessageFormatVersionProp it needs LogConfig. 
3. LogConfig begins initialization 
4. LogConfig.configDef begins initialization 
5. .define(UncleanLeaderElectionEnableProp) needs 
KafkaConfig.UncleanLeaderElectionEnableProp, which is defined below  
KafkaConfig.LogMessageFormatVersionProp so it's null 
6. Can't start another initialization of KafkaConfig 
7. So .define inserts null. This is applicable to all three 
MinInSyncReplicasProp, UncleanLeaderElectionEnableProp, and CompressionTypeProp



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4395) KafkaConfig and LogConfig should not have static initialization order dependencies

2016-11-09 Thread Sumant Tambe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sumant Tambe updated KAFKA-4395:

Affects Version/s: 0.10.0.1

> KafkaConfig and LogConfig should not have static initialization order 
> dependencies
> --
>
> Key: KAFKA-4395
> URL: https://issues.apache.org/jira/browse/KAFKA-4395
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
>Reporter: Sumant Tambe
>    Assignee: Sumant Tambe
>
> LogConfig.configDef.serverDefaultConfigNames is not initialized properly in 
> due to static initialization order dependencies between KafkaConfig and 
> LogConfig. The map ends inserting null values, which are all string literals. 
> Consider the following.
> 1. KafkaConfig begins initialization at first because KafkaServer constructor 
> needs KafkaConfig. 
> 2. at KafkaConfig.LogMessageFormatVersionProp it needs LogConfig. 
> 3. LogConfig begins initialization 
> 4. LogConfig.configDef begins initialization 
> 5. .define(UncleanLeaderElectionEnableProp) needs 
> KafkaConfig.UncleanLeaderElectionEnableProp, which is defined below  
> KafkaConfig.LogMessageFormatVersionProp so it's null 
> 6. Can't start another initialization of KafkaConfig 
> 7. So .define inserts null. This is applicable to all three 
> MinInSyncReplicasProp, UncleanLeaderElectionEnableProp, and 
> CompressionTypeProp



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-4089) KafkaProducer raises Batch Expired exception

2016-08-26 Thread Sumant Tambe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sumant Tambe reassigned KAFKA-4089:
---

Assignee: Sumant Tambe  (was: Dong Lin)

> KafkaProducer raises Batch Expired exception 
> -
>
> Key: KAFKA-4089
> URL: https://issues.apache.org/jira/browse/KAFKA-4089
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Sumant Tambe
>    Assignee: Sumant Tambe
>
> The basic idea of batch expiration is that we don't expire batches when 
> producer thinks "it can make progress". Currently the notion of "making 
> progress" involves only in-flight requests (muted partitions). That's not 
> sufficient. The other half of the "making progress" is that if we have stale 
> metadata, we cannot trust it and therefore can't say we can't make progress. 
> Therefore, we don't expire batched when metadata is stale. This also implies 
> we don't want to expire batches when we can still make progress even if the 
> batch remains in the queue longer than the batch expiration time. 
> The current condition in {{abortExpiredBatches}} that bypasses muted 
> partitions is necessary but not sufficient. It should additionally restrict 
> ejection when metadata is stale. 
> Conversely, it should expire batches only when the following is true
> # !muted AND
> # meta-data is fresh AND
> # batch remained in the queue longer than request timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4089) KafkaProducer raises Batch Expired exception

2016-08-25 Thread Sumant Tambe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sumant Tambe updated KAFKA-4089:

Description: 
The basic idea of batch expiration is that we don't expire batches when 
producer thinks "it can make progress". Currently the notion of "making 
progress" involves only in-flight requests (muted partitions). That's not 
sufficient. The other half of the "making progress" is that if we have stale 
metadata, we cannot trust it and therefore can't say we can't make progress. 
Therefore, we don't expire batched when metadata is stale. This also implies we 
don't want to expire batches when we can still make progress even if the batch 
remains in the queue longer than the batch expiration time. 

The current condition in {{abortExpiredBatches}} that bypasses muted partitions 
is necessary but not sufficient. It should additionally restrict ejection when 
metadata is stale. 

Conversely, it should expire batches only when the following is true
# !muted AND
# meta-data is fresh AND
# batch remained in the queue longer than request timeout.

  was:
The basic idea of batch expiration is that we don't expire batches when 
producer thinks "it can make progress". Currently the notion of "making 
progress" involves only in-flight requests (muted partitions). That's not 
sufficient. The other half of the "making progress" is that if we have stale 
metadata, we cannot trust it and therefore can't say we can't make progress. 
Therefore, we don't expire batched when metadata is stale. This also implies we 
don't want to expire batches when we can still make progress even if the batch 
remains in the queue longer than the batch expiration time. 

The current condition in {{abortExpiredBatches}} that bypasses muted partitions 
is necessary but not sufficient. It should additionally restrict ejection when 
metadata is stale. 

Conversely, it should expire batches only when the following is true
# meta-data is fresh AND
# batch remained in the queue longer than request timeout.


> KafkaProducer raises Batch Expired exception 
> -
>
> Key: KAFKA-4089
> URL: https://issues.apache.org/jira/browse/KAFKA-4089
> Project: Kafka
>  Issue Type: Bug
>      Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Sumant Tambe
>Assignee: Dong Lin
>
> The basic idea of batch expiration is that we don't expire batches when 
> producer thinks "it can make progress". Currently the notion of "making 
> progress" involves only in-flight requests (muted partitions). That's not 
> sufficient. The other half of the "making progress" is that if we have stale 
> metadata, we cannot trust it and therefore can't say we can't make progress. 
> Therefore, we don't expire batched when metadata is stale. This also implies 
> we don't want to expire batches when we can still make progress even if the 
> batch remains in the queue longer than the batch expiration time. 
> The current condition in {{abortExpiredBatches}} that bypasses muted 
> partitions is necessary but not sufficient. It should additionally restrict 
> ejection when metadata is stale. 
> Conversely, it should expire batches only when the following is true
> # !muted AND
> # meta-data is fresh AND
> # batch remained in the queue longer than request timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4089) KafkaProducer raises Batch Expired exception

2016-08-25 Thread Sumant Tambe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sumant Tambe updated KAFKA-4089:

Description: 
The basic idea of batch expiration is that we don't expire batches when 
producer thinks "it can make progress". Currently the notion of "making 
progress" involves only in-flight requests (muted partitions). That's not 
sufficient. The other half of the "making progress" is that if we have stale 
metadata, we cannot trust it and therefore can't say we can't make progress. 
Therefore, we don't expire batched when metadata is stale. This also implies we 
don't want to expire batches when we can still make progress even if the batch 
remains in the queue longer than the batch expiration time. 

The current condition in {{abortExpiredBatches}} that bypasses muted partitions 
is necessary but not sufficient. It should additionally restrict ejection when 
metadata is stale. 

Conversely, it should expire batches only when the following is true
# meta-data is fresh AND
# batch remained in the queue longer than request timeout.

  was:
The basic idea of batch expiration is that we don't expire batches when 
producer thinks "it can make progress". Currently the notion of "making 
progress" involves only in-flight requests (muted partitions). That's not 
sufficient. The other half of the "making progress" is that if we have stale 
metadata, we cannot trust it and therefore can't say we can't make progress. 
Therefore, we don't expire batched when metadata is stale. This also implies we 
don't want to expire batches when we can still make progress even if the batch 
remains in the queue longer than the batch expiration time. 

More concretely, the batch expiration logic 
({{RecordAccumualator.abortExpiredBatches}})  ejects batches out when the 
cluster metadata needs an update ({{Metadata.timeToNextUpdate==0}}). In this 
case, no nodes are "ready" to send data to ({{result.readyNodes}} is empty). As 
a consequence, {{Sender.drain}} does not drain any batch at all and therefore 
no new topic-partitions are muted. 

The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  
bypasses muted partitions only. As there are no new muted partitions, 
everything that was not sent in previous drains is subject to expiration. As a 
result, a group of batches expire if they linger in the queue for longer than 
{{requestTimeout}}.

The current condition in {{abortExpiredBatches}} that bypasses muted partitions 
is necessary but not sufficient. It should additionally restrict ejection when 
metadata is stale. 

Conversely, it should expire batches only when the following is true
# !muted AND
# meta-data is fresh but leader not available 
# batch remained in the queue longer than request timeout.


> KafkaProducer raises Batch Expired exception 
> -
>
> Key: KAFKA-4089
> URL: https://issues.apache.org/jira/browse/KAFKA-4089
> Project: Kafka
>  Issue Type: Bug
>      Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Sumant Tambe
>Assignee: Dong Lin
>
> The basic idea of batch expiration is that we don't expire batches when 
> producer thinks "it can make progress". Currently the notion of "making 
> progress" involves only in-flight requests (muted partitions). That's not 
> sufficient. The other half of the "making progress" is that if we have stale 
> metadata, we cannot trust it and therefore can't say we can't make progress. 
> Therefore, we don't expire batched when metadata is stale. This also implies 
> we don't want to expire batches when we can still make progress even if the 
> batch remains in the queue longer than the batch expiration time. 
> The current condition in {{abortExpiredBatches}} that bypasses muted 
> partitions is necessary but not sufficient. It should additionally restrict 
> ejection when metadata is stale. 
> Conversely, it should expire batches only when the following is true
> # meta-data is fresh AND
> # batch remained in the queue longer than request timeout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4089) KafkaProducer raises Batch Expired exception

2016-08-25 Thread Sumant Tambe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sumant Tambe updated KAFKA-4089:

Description: 
The basic idea of batch expiration is that we don't expire batches when 
producer thinks "it can make progress". Currently the notion of "making 
progress" involves only in-flight requests (muted partitions). That's not 
sufficient. The other half of the "making progress" is that if we have stale 
metadata, we cannot trust it and therefore can't say we can't make progress. 
Therefore, we don't expire batched when metadata is stale. This also implies we 
don't want to expire batches when we can still make progress even if the batch 
remains in the queue longer than the batch expiration time. 

More concretely, the batch expiration logic 
({{RecordAccumualator.abortExpiredBatches}})  ejects batches out when the 
cluster metadata needs an update ({{Metadata.timeToNextUpdate==0}}). In this 
case, no nodes are "ready" to send data to ({{result.readyNodes}} is empty). As 
a consequence, {{Sender.drain}} does not drain any batch at all and therefore 
no new topic-partitions are muted. 

The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  
bypasses muted partitions only. As there are no new muted partitions, 
everything that was not sent in previous drains is subject to expiration. As a 
result, a group of batches expire if they linger in the queue for longer than 
{{requestTimeout}}.

The current condition in {{abortExpiredBatches}} that bypasses muted partitions 
is necessary but not sufficient. It should additionally restrict ejection when 
metadata is stale. 

Conversely, it should expire batches only when the following is true
# !muted AND
# meta-data is fresh but leader not available 
# batch remained in the queue longer than request timeout.

  was:
The basic idea of batch expiration is that we don't expire batches when 
producer thinks "it can make progress". Currently the notion of "making 
progress" involves only in-flight requests (muted partitions). That's not 
sufficient. The other half of the "making progress" is that if we have stale 
metadata, we cannot trust it and therefore can't say we can't make progress. 
Therefore, we don't expire batched when metadata is stale. This also implies we 
don't want to expire batches when we can still make progress even if the batch 
remains in the queue longer than the batch expiration time. 

More concretely, the batch expiration logic 
({{RecordAccumualator.abortExpiredBatches}})  ejects batches out when the 
cluster metadata needs an update ({{Metadata.timeToNextUpdate==0}}). In this 
case, no nodes are "ready" to send data to ({{result.readyNodes}} is empty). As 
a consequence, {{Sender.drain}} does not drain any batch at all and therefore 
no new topic-partitions are muted. 

The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  
bypasses muted partitions only. As there are no new muted partitions, 
everything that was not sent in the last drain is subject to expiration. As a 
result, a group of batches expire if they linger in the queue for longer than 
{{requestTimeout}}.

The current condition in {{abortExpiredBatches}} that bypasses muted partitions 
is necessary but not sufficient. It should additionally restrict ejection when 
metadata is stale. 

Conversely, it should expire batches only when the following is true
# !muted AND
# meta-data is fresh but leader not available 
# batch remained in the queue longer than request timeout.


> KafkaProducer raises Batch Expired exception 
> -
>
> Key: KAFKA-4089
> URL: https://issues.apache.org/jira/browse/KAFKA-4089
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Sumant Tambe
>Assignee: Dong Lin
>
> The basic idea of batch expiration is that we don't expire batches when 
> producer thinks "it can make progress". Currently the notion of "making 
> progress" involves only in-flight requests (muted partitions). That's not 
> sufficient. The other half of the "making progress" is that if we have stale 
> metadata, we cannot trust it and therefore can't say we can't make progress. 
> Therefore, we don't expire batched when metadata is stale. This also implies 
> we don't want to expire batches when we can still make progress even if the 
> batch remains in the queue longer than the batch expiration time. 
> More concretely, the batch expiration logic 
> ({{RecordAccumualator.abortExpiredBatches}})  ejects batches out when the 
> cluster metadata needs an update ({{Metadata.timeToNextUpdate==0}}). In this 

[jira] [Updated] (KAFKA-4089) KafkaProducer raises Batch Expired exception

2016-08-25 Thread Sumant Tambe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sumant Tambe updated KAFKA-4089:

Description: 
The basic idea of batch expiration is that we don't expire batches when 
producer thinks "it can make progress". Currently the notion of "making 
progress" involves only in-flight requests (muted partitions). That's not 
sufficient. The other half of the "making progress" is that if we have stale 
metadata, we cannot trust it and therefore can't say we can't make progress. 
Therefore, we don't expire batched when metadata is stale. This also implies we 
don't want to expire batches when we can still make progress even if the batch 
remains in the queue longer than the batch expiration time. 

More concretely, the batch expiration logic 
({{RecordAccumualator.abortExpiredBatches}})  ejects batches out when the 
cluster metadata needs an update ({{Metadata.timeToNextUpdate==0}}). In this 
case, no nodes are "ready" to send data to ({{result.readyNodes}} is empty). As 
a consequence, {{Sender.drain}} does not drain any batch at all and therefore 
no new topic-partitions are muted. 

The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  
bypasses muted partitions only. As there are no new muted partitions, 
everything that was not sent in the last drain is subject to expiration. As a 
result, a group of batches expire if they linger in the queue for longer than 
{{requestTimeout}}.

The current condition in {{abortExpiredBatches}} that bypasses muted partitions 
is necessary but not sufficient. It should additionally restrict ejection when 
metadata is stale. 

Conversely, it should expire batches only when the following is true
# !muted AND
# meta-data is fresh but leader not available 
# batch remained in the queue longer than request timeout.

  was:
The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  ejects 
batches out when the cluster metadata needs an update 
({{Metadata.timeToNextUpdate==0}}). In this case, no nodes are "ready" to send 
data to ({{result.readyNodes}} is empty). As a consequence, {{Sender.drain}} 
does not drain any batch at all and therefore no new topic-partitions are 
muted. 

The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  
bypasses muted partitions only. As there are no new muted partitions, all 
batches, regardless of topic-partition, are subject to expiration. As a result, 
a group of batches expire if they linger in the queue for longer than 
{{requestTimeout}}.

Expiring batches unconditionally is a bug. It's too greedy. 

The current condition in {{abortExpiredBatches}} that bypasses muted partitions 
is necessary but not sufficient. It should additionally bypass partitions for 
which leader information is known and fresh. 

Conversely, it should expire batches only when the following is true
# !muted AND
# meta-data is fresh but leader not available 


> KafkaProducer raises Batch Expired exception 
> -
>
> Key: KAFKA-4089
> URL: https://issues.apache.org/jira/browse/KAFKA-4089
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Sumant Tambe
>Assignee: Dong Lin
>
> The basic idea of batch expiration is that we don't expire batches when 
> producer thinks "it can make progress". Currently the notion of "making 
> progress" involves only in-flight requests (muted partitions). That's not 
> sufficient. The other half of the "making progress" is that if we have stale 
> metadata, we cannot trust it and therefore can't say we can't make progress. 
> Therefore, we don't expire batched when metadata is stale. This also implies 
> we don't want to expire batches when we can still make progress even if the 
> batch remains in the queue longer than the batch expiration time. 
> More concretely, the batch expiration logic 
> ({{RecordAccumualator.abortExpiredBatches}})  ejects batches out when the 
> cluster metadata needs an update ({{Metadata.timeToNextUpdate==0}}). In this 
> case, no nodes are "ready" to send data to ({{result.readyNodes}} is empty). 
> As a consequence, {{Sender.drain}} does not drain any batch at all and 
> therefore no new topic-partitions are muted. 
> The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  
> bypasses muted partitions only. As there are no new muted partitions, 
> everything that was not sent in the last drain is subject to expiration. As a 
> result, a group of batches expire if they linger in the queue for longer than 
> {{requestTimeout}}.
> The current condition in {{abortExpiredBatches}

[jira] [Updated] (KAFKA-4089) KafkaProducer raises Batch Expired exception

2016-08-25 Thread Sumant Tambe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sumant Tambe updated KAFKA-4089:

Description: 
The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  ejects 
batches out when the cluster metadata needs an update 
({{Metadata.timeToNextUpdate==0}}). In this case, no nodes are "ready" to send 
data to ({{result.readyNodes}} is empty). As a consequence, {{Sender.drain}} 
does not drain any batch at all and therefore no new topic-partitions are 
muted. 

The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  
bypasses muted partitions only. As there are no new muted partitions, all 
batches, regardless of topic-partition, are subject to expiration. As a result, 
a group of batches expire if they linger in the queue for longer than 
{{requestTimeout}}.

Expiring batches unconditionally is a bug. It's too greedy. 

The current condition in {{abortExpiredBatches}} that bypasses muted partitions 
is necessary but not sufficient. It should additionally bypass partitions for 
which leader information is known and fresh. 

Conversely, it should expire batches only when the following is true
# !muted AND
# meta-data is fresh but leader not available 

  was:
The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  ejects 
batches out the cluster metadata needed an update 
({{Metadata.timeToNextUpdate==0}}). In this case, no nodes are "ready" to send 
data to ({{result.readyNodes}} is empty). As a consequence, {{Sender.drain}} 
does not drain any batch at all and therefore no new topic-partitions are 
muted. 

The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  
bypasses muted partitions only. As there are no new muted partitions, all 
batches, regardless of topic-partition, are subject to expiration. As a result, 
a group of batches expire if they linger in the queue for longer than 
{{requestTimeout}}.

Expiring batches unconditionally is a bug. It's too greedy. 

The current condition in {{abortExpiredBatches}} that bypasses muted partitions 
is necessary but not sufficient. It should additionally bypass partitions for 
which leader information is known and fresh. 

Conversely, it should expire batches only when the following is true
# !muted AND
# meta-data is fresh but leader not available 


> KafkaProducer raises Batch Expired exception 
> -
>
> Key: KAFKA-4089
> URL: https://issues.apache.org/jira/browse/KAFKA-4089
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Sumant Tambe
>Assignee: Dong Lin
>
> The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  
> ejects batches out when the cluster metadata needs an update 
> ({{Metadata.timeToNextUpdate==0}}). In this case, no nodes are "ready" to 
> send data to ({{result.readyNodes}} is empty). As a consequence, 
> {{Sender.drain}} does not drain any batch at all and therefore no new 
> topic-partitions are muted. 
> The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  
> bypasses muted partitions only. As there are no new muted partitions, all 
> batches, regardless of topic-partition, are subject to expiration. As a 
> result, a group of batches expire if they linger in the queue for longer than 
> {{requestTimeout}}.
> Expiring batches unconditionally is a bug. It's too greedy. 
> The current condition in {{abortExpiredBatches}} that bypasses muted 
> partitions is necessary but not sufficient. It should additionally bypass 
> partitions for which leader information is known and fresh. 
> Conversely, it should expire batches only when the following is true
> # !muted AND
> # meta-data is fresh but leader not available 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4089) KafkaProducer raises Batch Expired exception

2016-08-25 Thread Sumant Tambe (JIRA)
Sumant Tambe created KAFKA-4089:
---

 Summary: KafkaProducer raises Batch Expired exception 
 Key: KAFKA-4089
 URL: https://issues.apache.org/jira/browse/KAFKA-4089
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.0.1
Reporter: Sumant Tambe


The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  ejects 
batches out the cluster metadata needed an update 
({{Metadata.timeToNextUpdate==0}}). In this case, no nodes are "ready" to send 
data to ({{result.readyNodes}} is empty). As a consequence, {{Sender.drain}} 
does not drain any batch at all and therefore no new topic-partitions are 
muted. 

The batch expiration logic ({{RecordAccumualator.abortExpiredBatches}})  
bypasses muted partitions only. As there are no new muted partitions, all 
batches, regardless of topic-partition, are subject to expiration. As a result, 
a group of batches expire if they linger in the queue for longer than 
{{requestTimeout}}.

Expiring batches unconditionally is a bug. It's too greedy. 

The current condition in {{abortExpiredBatches}} that bypasses muted partitions 
is necessary but not sufficient. It should additionally bypass partitions for 
which leader information is known and fresh. 

Conversely, it should expire batches only when the following is true
# !muted AND
# meta-data is fresh but leader not available 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1911) Log deletion on stopping replicas should be async

2016-07-25 Thread Sumant Tambe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15392877#comment-15392877
 ] 

Sumant Tambe commented on KAFKA-1911:
-

Hi [~mgharat], I would like to take your patch over. I've rebased your patch on 
top of the latest trunk and fixed a couple of test failures. I'll send a 
pull-request soon.

> Log deletion on stopping replicas should be async
> -
>
> Key: KAFKA-1911
> URL: https://issues.apache.org/jira/browse/KAFKA-1911
> Project: Kafka
>  Issue Type: Bug
>  Components: log, replication
>Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>  Labels: newbie++, newbiee
>
> If a StopReplicaRequest sets delete=true then we do a file.delete on the file 
> message sets. I was under the impression that this is fast but it does not 
> seem to be the case.
> On a partition reassignment in our cluster the local time for stop replica 
> took nearly 30 seconds.
> {noformat}
> Completed request:Name: StopReplicaRequest; Version: 0; CorrelationId: 467; 
> ClientId: ;DeletePartitions: true; ControllerId: 1212; ControllerEpoch: 
> 53 from 
> client/...:45964;totalTime:29191,requestQueueTime:1,localTime:29190,remoteTime:0,responseQueueTime:0,sendTime:0
> {noformat}
> This ties up one API thread for the duration of the request.
> Specifically in our case, the queue times for other requests also went up and 
> producers to the partition that was just deleted on the old leader took a 
> while to refresh their metadata (see KAFKA-1303) and eventually ran out of 
> retries on some messages leading to data loss.
> I think the log deletion in this case should be fully asynchronous although 
> we need to handle the case when a broker may respond immediately to the 
> stop-replica-request but then go down after deleting only some of the log 
> segments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)