Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-06-02 Thread Jay Kreps
Quick follow-up on the discussion on KIP-19. For partitionsFor() I think
the question was whether to use max.enqueue.block.ms or request.timeout.ms
to control the timeout. The proposed plan was to use request.timeout.ms.
Alternately we could change the config max.enqueue.block.ms to max.block.ms
and use that instead. The argument for this is that in both cases you are
configuring the time the client will block.

I think this is sort of a corner case so I am +1 either way.

-Jay

On Tue, Jun 2, 2015 at 10:07 AM, Ewen Cheslack-Postava e...@confluent.io
wrote:

 Option 3 seems a lot better than previous options, especially from the
 user's perspective. I think it gives reasonable balance between control and
 fewer options, and the only implementation details it's exposing are that
 there is a buffer and there is a network request. Making the request
 timeout only start after enqueuing still allows you to compute a maximum
 timeout for a request by adding the two values, but doesn't have annoying
 artifacts like sometimes issuing a network request when there's only a
 fraction of a millisecond left for it to complete.

 REQUEST_TIMEOUT_DOC could probably add something about the retries, e.g.
 something like This timeout is per retry, so the maximum time spent
 waiting for a request to complete will be (retries+1) *
 network.request.timeout.ms.

 There's also one other use of the metadata fetch timeout in partitionsFor.
 Are we converting that to use MAX_ENQUEUE_TIMEOUT_MS_CONFIG? The naming is
 a bit awkward, but we need to use something there.

 Finally, just a nit, but the naming conventions for variables are getting
 inconsistent. Some have _MS in them, some don't, and some of the _DOC names
 are inconsistent with the _CONFIG names.

 -Ewen


 On Mon, Jun 1, 2015 at 9:44 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  Bump up this thread.
 
  After several discussions in LinkedIn, we came up with three options. I
  have updated the KIP-19 wiki page to summarize the three options and
  stated our preference. We can discuss on them in tomorrow’s KIP hangout.
  Please let us know what do you think.
 
  Thanks,
 
  Jiangjie (Becket) Qin
 
  On 5/21/15, 5:54 PM, Jiangjie Qin j...@linkedin.com wrote:
 
  Based on the discussion we have, I just updated the KIP with the
 following
  proposal and want to see if there is further comments.
  
  The proposal is to have the following four timeout as end state.
  
  1. max.buffer.full.block.ms   - To replace block.on.buffer.full. The
 max
  time to block when buffer is full.
  2. metadata.fetch.timeout.ms  - reuse metadata timeout as
  batch.timeout.ms
  because it is essentially metadata not available.
  3. replication.timeout.ms - It defines how long a server will wait
  for
  the records to be replicated to followers.
  4. network.request.timeout.ms - This timeout is used when producer
 sends
  request to brokers through TCP connections. It specifies how long the
  producer should wait for the response.
  
  With the above approach, we can achieve the following.
  * We can have bounded blocking time for send() = (1) + (2).
  * The time after send() until response got received is generally bounded
  by linger.ms + (2) + (4), not taking retries into consideration.
  
  So from user’s perspective. Send() depends on metadata of a topic and
  buffer space. I am not sure if user would really care about how long it
  takes to receive the response because it is async anyway and we have so
  many things to consider (retries, linger.ms, retry backoff time,
 request
  timeout, etc).
  
  I think these configurations are clear enough to let user understand at
  the first glance. Please let me know what do you think.
  
  Thanks.
  
  Jiangjie (Becket) Qin
  
  
  
  On 5/20/15, 9:55 AM, Joel Koshy jjkosh...@gmail.com wrote:
  
   The fact that I understand the producer internals and am still
  struggling
   to understand the implications of the different settings, how I would
  set
   them, and how they potentially interact such that I could set invalid
   combinations seems like a red flag to me... Being able to say I want
   produce requests to timeout in 5s shouldn't require adjusting 3 or 4
   configs if the defaults would normally timeout out in something like
  30s.
  
   Setting aside compatibility issues and focusing on the best set of
  configs,
   I agree with Jay that there are two things I actually want out of the
  API.
   The key thing is a per-request timeout, which should be enforced
 client
   side. I would just expect this to follow the request through any
  internals
   so it can be enforced no matter where in the pipeline the request is.
   Within each component in the pipeline we might have to compute how
 much
   time we have left for the request in order to create a timeout within
  that
   setting. The second setting is to bound the amount of time spent
  blocking
   on send(). This is really an implementation detail, but one that
 people
  are
 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-06-02 Thread Jiangjie Qin
Thanks Jay. max.block.ms looks good. I will update the wiki page.

Jiangjie (Becket) Qin

On 6/2/15, 11:26 AM, Jay Kreps jay.kr...@gmail.com wrote:

Quick follow-up on the discussion on KIP-19. For partitionsFor() I think
the question was whether to use max.enqueue.block.ms or request.timeout.ms
to control the timeout. The proposed plan was to use request.timeout.ms.
Alternately we could change the config max.enqueue.block.ms to
max.block.ms
and use that instead. The argument for this is that in both cases you are
configuring the time the client will block.

I think this is sort of a corner case so I am +1 either way.

-Jay

On Tue, Jun 2, 2015 at 10:07 AM, Ewen Cheslack-Postava e...@confluent.io
wrote:

 Option 3 seems a lot better than previous options, especially from the
 user's perspective. I think it gives reasonable balance between control
and
 fewer options, and the only implementation details it's exposing are
that
 there is a buffer and there is a network request. Making the request
 timeout only start after enqueuing still allows you to compute a maximum
 timeout for a request by adding the two values, but doesn't have
annoying
 artifacts like sometimes issuing a network request when there's only a
 fraction of a millisecond left for it to complete.

 REQUEST_TIMEOUT_DOC could probably add something about the retries, e.g.
 something like This timeout is per retry, so the maximum time spent
 waiting for a request to complete will be (retries+1) *
 network.request.timeout.ms.

 There's also one other use of the metadata fetch timeout in
partitionsFor.
 Are we converting that to use MAX_ENQUEUE_TIMEOUT_MS_CONFIG? The naming
is
 a bit awkward, but we need to use something there.

 Finally, just a nit, but the naming conventions for variables are
getting
 inconsistent. Some have _MS in them, some don't, and some of the _DOC
names
 are inconsistent with the _CONFIG names.

 -Ewen


 On Mon, Jun 1, 2015 at 9:44 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  Bump up this thread.
 
  After several discussions in LinkedIn, we came up with three options.
I
  have updated the KIP-19 wiki page to summarize the three options and
  stated our preference. We can discuss on them in tomorrow’s KIP
hangout.
  Please let us know what do you think.
 
  Thanks,
 
  Jiangjie (Becket) Qin
 
  On 5/21/15, 5:54 PM, Jiangjie Qin j...@linkedin.com wrote:
 
  Based on the discussion we have, I just updated the KIP with the
 following
  proposal and want to see if there is further comments.
  
  The proposal is to have the following four timeout as end state.
  
  1. max.buffer.full.block.ms   - To replace block.on.buffer.full. The
 max
  time to block when buffer is full.
  2. metadata.fetch.timeout.ms  - reuse metadata timeout as
  batch.timeout.ms
  because it is essentially metadata not available.
  3. replication.timeout.ms - It defines how long a server will
wait
  for
  the records to be replicated to followers.
  4. network.request.timeout.ms - This timeout is used when producer
 sends
  request to brokers through TCP connections. It specifies how long the
  producer should wait for the response.
  
  With the above approach, we can achieve the following.
  * We can have bounded blocking time for send() = (1) + (2).
  * The time after send() until response got received is generally
bounded
  by linger.ms + (2) + (4), not taking retries into consideration.
  
  So from user’s perspective. Send() depends on metadata of a topic and
  buffer space. I am not sure if user would really care about how long
it
  takes to receive the response because it is async anyway and we have
so
  many things to consider (retries, linger.ms, retry backoff time,
 request
  timeout, etc).
  
  I think these configurations are clear enough to let user understand
at
  the first glance. Please let me know what do you think.
  
  Thanks.
  
  Jiangjie (Becket) Qin
  
  
  
  On 5/20/15, 9:55 AM, Joel Koshy jjkosh...@gmail.com wrote:
  
   The fact that I understand the producer internals and am still
  struggling
   to understand the implications of the different settings, how I
would
  set
   them, and how they potentially interact such that I could set
invalid
   combinations seems like a red flag to me... Being able to say I
want
   produce requests to timeout in 5s shouldn't require adjusting 3
or 4
   configs if the defaults would normally timeout out in something
like
  30s.
  
   Setting aside compatibility issues and focusing on the best set of
  configs,
   I agree with Jay that there are two things I actually want out of
the
  API.
   The key thing is a per-request timeout, which should be enforced
 client
   side. I would just expect this to follow the request through any
  internals
   so it can be enforced no matter where in the pipeline the request
is.
   Within each component in the pipeline we might have to compute how
 much
   time we have left for the request in order to create a timeout
within
  that
   setting. 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-06-02 Thread Ewen Cheslack-Postava
Option 3 seems a lot better than previous options, especially from the
user's perspective. I think it gives reasonable balance between control and
fewer options, and the only implementation details it's exposing are that
there is a buffer and there is a network request. Making the request
timeout only start after enqueuing still allows you to compute a maximum
timeout for a request by adding the two values, but doesn't have annoying
artifacts like sometimes issuing a network request when there's only a
fraction of a millisecond left for it to complete.

REQUEST_TIMEOUT_DOC could probably add something about the retries, e.g.
something like This timeout is per retry, so the maximum time spent
waiting for a request to complete will be (retries+1) *
network.request.timeout.ms.

There's also one other use of the metadata fetch timeout in partitionsFor.
Are we converting that to use MAX_ENQUEUE_TIMEOUT_MS_CONFIG? The naming is
a bit awkward, but we need to use something there.

Finally, just a nit, but the naming conventions for variables are getting
inconsistent. Some have _MS in them, some don't, and some of the _DOC names
are inconsistent with the _CONFIG names.

-Ewen


On Mon, Jun 1, 2015 at 9:44 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Bump up this thread.

 After several discussions in LinkedIn, we came up with three options. I
 have updated the KIP-19 wiki page to summarize the three options and
 stated our preference. We can discuss on them in tomorrow’s KIP hangout.
 Please let us know what do you think.

 Thanks,

 Jiangjie (Becket) Qin

 On 5/21/15, 5:54 PM, Jiangjie Qin j...@linkedin.com wrote:

 Based on the discussion we have, I just updated the KIP with the following
 proposal and want to see if there is further comments.
 
 The proposal is to have the following four timeout as end state.
 
 1. max.buffer.full.block.ms   - To replace block.on.buffer.full. The max
 time to block when buffer is full.
 2. metadata.fetch.timeout.ms  - reuse metadata timeout as
 batch.timeout.ms
 because it is essentially metadata not available.
 3. replication.timeout.ms - It defines how long a server will wait
 for
 the records to be replicated to followers.
 4. network.request.timeout.ms - This timeout is used when producer sends
 request to brokers through TCP connections. It specifies how long the
 producer should wait for the response.
 
 With the above approach, we can achieve the following.
 * We can have bounded blocking time for send() = (1) + (2).
 * The time after send() until response got received is generally bounded
 by linger.ms + (2) + (4), not taking retries into consideration.
 
 So from user’s perspective. Send() depends on metadata of a topic and
 buffer space. I am not sure if user would really care about how long it
 takes to receive the response because it is async anyway and we have so
 many things to consider (retries, linger.ms, retry backoff time, request
 timeout, etc).
 
 I think these configurations are clear enough to let user understand at
 the first glance. Please let me know what do you think.
 
 Thanks.
 
 Jiangjie (Becket) Qin
 
 
 
 On 5/20/15, 9:55 AM, Joel Koshy jjkosh...@gmail.com wrote:
 
  The fact that I understand the producer internals and am still
 struggling
  to understand the implications of the different settings, how I would
 set
  them, and how they potentially interact such that I could set invalid
  combinations seems like a red flag to me... Being able to say I want
  produce requests to timeout in 5s shouldn't require adjusting 3 or 4
  configs if the defaults would normally timeout out in something like
 30s.
 
  Setting aside compatibility issues and focusing on the best set of
 configs,
  I agree with Jay that there are two things I actually want out of the
 API.
  The key thing is a per-request timeout, which should be enforced client
  side. I would just expect this to follow the request through any
 internals
  so it can be enforced no matter where in the pipeline the request is.
  Within each component in the pipeline we might have to compute how much
  time we have left for the request in order to create a timeout within
 that
  setting. The second setting is to bound the amount of time spent
 blocking
  on send(). This is really an implementation detail, but one that people
 are
  complaining about enough that it seems worthwhile to provide control
 over
  it (and fixing it would just make that setting superfluous, not break
  anything).
 
  Exposing a lot more settings also exposes a lot about the
 implementation
  and makes it harder to improve the implementation in the future, but I
  don't think we have listed good use cases for setting each of them
  individually. Why would the user specifically care about how much time
 the
  request spends in the accumulator vs. some other component (assuming
 they
  have the overall timeout)? Same for requests in flight, as long as I
 have
  that client side timeout? And if they care about what 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-06-01 Thread Jiangjie Qin
Bump up this thread.

After several discussions in LinkedIn, we came up with three options. I
have updated the KIP-19 wiki page to summarize the three options and
stated our preference. We can discuss on them in tomorrow’s KIP hangout.
Please let us know what do you think.

Thanks,

Jiangjie (Becket) Qin

On 5/21/15, 5:54 PM, Jiangjie Qin j...@linkedin.com wrote:

Based on the discussion we have, I just updated the KIP with the following
proposal and want to see if there is further comments.

The proposal is to have the following four timeout as end state.

1. max.buffer.full.block.ms   - To replace block.on.buffer.full. The max
time to block when buffer is full.
2. metadata.fetch.timeout.ms  - reuse metadata timeout as batch.timeout.ms
because it is essentially metadata not available.
3. replication.timeout.ms - It defines how long a server will wait for
the records to be replicated to followers.
4. network.request.timeout.ms - This timeout is used when producer sends
request to brokers through TCP connections. It specifies how long the
producer should wait for the response.

With the above approach, we can achieve the following.
* We can have bounded blocking time for send() = (1) + (2).
* The time after send() until response got received is generally bounded
by linger.ms + (2) + (4), not taking retries into consideration.

So from user’s perspective. Send() depends on metadata of a topic and
buffer space. I am not sure if user would really care about how long it
takes to receive the response because it is async anyway and we have so
many things to consider (retries, linger.ms, retry backoff time, request
timeout, etc).

I think these configurations are clear enough to let user understand at
the first glance. Please let me know what do you think.

Thanks.

Jiangjie (Becket) Qin



On 5/20/15, 9:55 AM, Joel Koshy jjkosh...@gmail.com wrote:

 The fact that I understand the producer internals and am still
struggling
 to understand the implications of the different settings, how I would
set
 them, and how they potentially interact such that I could set invalid
 combinations seems like a red flag to me... Being able to say I want
 produce requests to timeout in 5s shouldn't require adjusting 3 or 4
 configs if the defaults would normally timeout out in something like
30s.
 
 Setting aside compatibility issues and focusing on the best set of
configs,
 I agree with Jay that there are two things I actually want out of the
API.
 The key thing is a per-request timeout, which should be enforced client
 side. I would just expect this to follow the request through any
internals
 so it can be enforced no matter where in the pipeline the request is.
 Within each component in the pipeline we might have to compute how much
 time we have left for the request in order to create a timeout within
that
 setting. The second setting is to bound the amount of time spent
blocking
 on send(). This is really an implementation detail, but one that people
are
 complaining about enough that it seems worthwhile to provide control
over
 it (and fixing it would just make that setting superfluous, not break
 anything).

 Exposing a lot more settings also exposes a lot about the
implementation
 and makes it harder to improve the implementation in the future, but I
 don't think we have listed good use cases for setting each of them
 individually. Why would the user specifically care about how much time
the
 request spends in the accumulator vs. some other component (assuming
they
 have the overall timeout)? Same for requests in flight, as long as I
have
 that client side timeout? And if they care about what component is the
 bottleneck, could that be better exposed by the exceptions that are
 returned rather than a ton of different settings?

Agreed with the above. I'm also extremely wary of configs that are
inherently unintuitive, or can interact to yield unintuitive behavior.
OTOH I think it is okay if a config is categorized as advanced or if
it requires deeper knowledge of the internals of the producer (or the
configured system in general). i.e., as long as we think long and hard
and agree on necessity (driven by clear use cases) before adding such
configs. We should also consider how we can simplify or even eliminate
existing configs.

Re: requests in flight may be a good example: Becket had given a valid
use-case i.e., support strict ordering. Maybe we can replace it with a
enable.strict.ordering config which is clearer in intent and would
internally ensure only one in-flight request per partition and default
to a fixed in-flight requests (say, five or 10) if set to false. If we
implement idempotence then we won't even need that.

 On Tue, May 19, 2015 at 7:13 PM, Jiangjie Qin
j...@linkedin.com.invalid
 wrote:
 
  Hi Jay,
 
  I updated what I think int KIP wiki. Just a short summary here.
Because we
  need timeout for:
  1. Send()
  2. Batches in accumulator
  3. Requests in flight.
  That means we need to have at least three 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-20 Thread Joel Koshy
 The fact that I understand the producer internals and am still struggling
 to understand the implications of the different settings, how I would set
 them, and how they potentially interact such that I could set invalid
 combinations seems like a red flag to me... Being able to say I want
 produce requests to timeout in 5s shouldn't require adjusting 3 or 4
 configs if the defaults would normally timeout out in something like 30s.
 
 Setting aside compatibility issues and focusing on the best set of configs,
 I agree with Jay that there are two things I actually want out of the API.
 The key thing is a per-request timeout, which should be enforced client
 side. I would just expect this to follow the request through any internals
 so it can be enforced no matter where in the pipeline the request is.
 Within each component in the pipeline we might have to compute how much
 time we have left for the request in order to create a timeout within that
 setting. The second setting is to bound the amount of time spent blocking
 on send(). This is really an implementation detail, but one that people are
 complaining about enough that it seems worthwhile to provide control over
 it (and fixing it would just make that setting superfluous, not break
 anything).

 Exposing a lot more settings also exposes a lot about the implementation
 and makes it harder to improve the implementation in the future, but I
 don't think we have listed good use cases for setting each of them
 individually. Why would the user specifically care about how much time the
 request spends in the accumulator vs. some other component (assuming they
 have the overall timeout)? Same for requests in flight, as long as I have
 that client side timeout? And if they care about what component is the
 bottleneck, could that be better exposed by the exceptions that are
 returned rather than a ton of different settings?

Agreed with the above. I'm also extremely wary of configs that are
inherently unintuitive, or can interact to yield unintuitive behavior.
OTOH I think it is okay if a config is categorized as advanced or if
it requires deeper knowledge of the internals of the producer (or the
configured system in general). i.e., as long as we think long and hard
and agree on necessity (driven by clear use cases) before adding such
configs. We should also consider how we can simplify or even eliminate
existing configs.

Re: requests in flight may be a good example: Becket had given a valid
use-case i.e., support strict ordering. Maybe we can replace it with a
enable.strict.ordering config which is clearer in intent and would
internally ensure only one in-flight request per partition and default
to a fixed in-flight requests (say, five or 10) if set to false. If we
implement idempotence then we won't even need that.

 On Tue, May 19, 2015 at 7:13 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
  Hi Jay,
 
  I updated what I think int KIP wiki. Just a short summary here. Because we
  need timeout for:
  1. Send()
  2. Batches in accumulator
  3. Requests in flight.
  That means we need to have at least three configurations if we do not
  reuse configurations.
 
  I think we probably want to also separate the configurations for exception
  handling and SLA purposes as well.
  My understanding of the configurations we are discussing here is they are
  for exception handling but not for SLA purposes. It looks to me that
  exception handling is more component oriented while SLA is more of
  systematic tuning. What you suggested sounds more like to set
  configurations to meet a user defined SLA. I am not sure if this is the
  things we want to do here.
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
  On 5/19/15, 5:42 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
  Yeah I think linger.ms remains separate, setting that is a performance
  optimization rather than failure handling thing. We should ideally sanity
  check this, though, in my proposal, since if they set linger.ms 
  request.timeout then that won't work.
  
  It's true that in my proposal that the actual replication timeout we set
  on
  the request would be non-deterministic. However the flip side of that
  argument is that in the existing proposal the actual time until an
  acknowledgement is non-deterministic, right? So I think the argument I am
  trying to construct is that the two things the user cares about are the
  time to block and the time to ack and any other timeout we use internally
  is basically an implementation detail of ensuring this.
  
  Your point about the difference between batches and requests is a good
  one.
  I hadn't thought of that. So to make my proposal  work we would need to do
  something like base the request time off the oldest batch. Let me think
  about the implications of that, it's definitely a problem.
  
  -Jay
  
  On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid
  
  wrote:
  
   Hey Jay,
  
   That is also a viable solution.
  
   I think 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-20 Thread Jiangjie Qin
Hey Ewen,

I agree with you that we should avoid any unnecessary configurations
exposed to user. And the necessity is defined by use case.
I also agree that the configurations should be named from users’
perspective and comply with the intuition - for example, like what Joel
said, something like enable.strict.order.
That is exactly why I think it makes sense to just use metadata timeout to
expire the batches in accumulator without adding another configuration.

I still have some concerns over having only one timeout configuration
after messages are appended to accumulator. IMO, this will cause problem
in some scenarios. What do we expect user to set that value? If a batch
stayed in accumulator for a while and maybe only 1 second is left before
timeout, are we going to send it or not?
From what I can see, network request timeout is to make sure we have
enough buffer to allow a broker respond a little bit slow but also be able
to avoid sticking to a dead broker for too long. It should be independent
of other settings. If we agree on this then that indicates we need to have
a *stand-alone* network.request.timeout.ms

If we really want to give user some kind of guarantee on blocking time of
send() and minimize the configurations. I feel it makes more sense to have
a timeout including time blocked on send() and time spent in accumulator.
But this will again have a similar question what if send() blocked for
some time and there is only 1 second left when we put the message into
accumulator?

Also it is not clear to me how would one configuration solve the timeouts
we have are on different entities, specifically:
1. In send() - messages
2. In accumulator - batches
3. In NetworkClient - requests

I think there are two ways to think about the configurations:
1. Set configuration by steps, and let user to specify how long they want
to wait for each steps - I think this is are you and Jay are opposing to.
And I agree.
2. Tell user what things are required for a message to be sent. And let
them set on how long they are willing to wait on each of them. So we have
metadata, buffer pool, request timeout, replication timeout.

I prefer the second way. In this case we can have:
A. Metadata.timeout.ms - used in send() and accumulator
B. Blocking.on.buffer.full.ms - used in send()
C. Network.request.timeout.ms - used in NetworkClient
D. Replication.timeout.ms - used in broker

A send() will block for at most A + B, a send-to-response time can be at
most A + B + C + D.

It is very clear to user what kind of things they are configuring.
Arguably, we are exposing internal thing to user, but it is better than
providing some ambiguity to user and later on we have to explain the
internal details to user separately.

Thanks.

Jiangjie (Becket) Qin

On 5/19/15, 9:53 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

The fact that I understand the producer internals and am still struggling
to understand the implications of the different settings, how I would set
them, and how they potentially interact such that I could set invalid
combinations seems like a red flag to me... Being able to say I want
produce requests to timeout in 5s shouldn't require adjusting 3 or 4
configs if the defaults would normally timeout out in something like 30s.

Setting aside compatibility issues and focusing on the best set of
configs,
I agree with Jay that there are two things I actually want out of the API.
The key thing is a per-request timeout, which should be enforced client
side. I would just expect this to follow the request through any internals
so it can be enforced no matter where in the pipeline the request is.
Within each component in the pipeline we might have to compute how much
time we have left for the request in order to create a timeout within that
setting. The second setting is to bound the amount of time spent blocking
on send(). This is really an implementation detail, but one that people
are
complaining about enough that it seems worthwhile to provide control over
it (and fixing it would just make that setting superfluous, not break
anything).

Exposing a lot more settings also exposes a lot about the implementation
and makes it harder to improve the implementation in the future, but I
don't think we have listed good use cases for setting each of them
individually. Why would the user specifically care about how much time the
request spends in the accumulator vs. some other component (assuming they
have the overall timeout)? Same for requests in flight, as long as I have
that client side timeout? And if they care about what component is the
bottleneck, could that be better exposed by the exceptions that are
returned rather than a ton of different settings?



On Tue, May 19, 2015 at 7:13 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Jay,

 I updated what I think int KIP wiki. Just a short summary here. Because
we
 need timeout for:
 1. Send()
 2. Batches in accumulator
 3. Requests in flight.
 That means we need to have at 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Ewen Cheslack-Postava
The fact that I understand the producer internals and am still struggling
to understand the implications of the different settings, how I would set
them, and how they potentially interact such that I could set invalid
combinations seems like a red flag to me... Being able to say I want
produce requests to timeout in 5s shouldn't require adjusting 3 or 4
configs if the defaults would normally timeout out in something like 30s.

Setting aside compatibility issues and focusing on the best set of configs,
I agree with Jay that there are two things I actually want out of the API.
The key thing is a per-request timeout, which should be enforced client
side. I would just expect this to follow the request through any internals
so it can be enforced no matter where in the pipeline the request is.
Within each component in the pipeline we might have to compute how much
time we have left for the request in order to create a timeout within that
setting. The second setting is to bound the amount of time spent blocking
on send(). This is really an implementation detail, but one that people are
complaining about enough that it seems worthwhile to provide control over
it (and fixing it would just make that setting superfluous, not break
anything).

Exposing a lot more settings also exposes a lot about the implementation
and makes it harder to improve the implementation in the future, but I
don't think we have listed good use cases for setting each of them
individually. Why would the user specifically care about how much time the
request spends in the accumulator vs. some other component (assuming they
have the overall timeout)? Same for requests in flight, as long as I have
that client side timeout? And if they care about what component is the
bottleneck, could that be better exposed by the exceptions that are
returned rather than a ton of different settings?



On Tue, May 19, 2015 at 7:13 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Jay,

 I updated what I think int KIP wiki. Just a short summary here. Because we
 need timeout for:
 1. Send()
 2. Batches in accumulator
 3. Requests in flight.
 That means we need to have at least three configurations if we do not
 reuse configurations.

 I think we probably want to also separate the configurations for exception
 handling and SLA purposes as well.
 My understanding of the configurations we are discussing here is they are
 for exception handling but not for SLA purposes. It looks to me that
 exception handling is more component oriented while SLA is more of
 systematic tuning. What you suggested sounds more like to set
 configurations to meet a user defined SLA. I am not sure if this is the
 things we want to do here.

 Thanks.

 Jiangjie (Becket) Qin

 On 5/19/15, 5:42 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Yeah I think linger.ms remains separate, setting that is a performance
 optimization rather than failure handling thing. We should ideally sanity
 check this, though, in my proposal, since if they set linger.ms 
 request.timeout then that won't work.
 
 It's true that in my proposal that the actual replication timeout we set
 on
 the request would be non-deterministic. However the flip side of that
 argument is that in the existing proposal the actual time until an
 acknowledgement is non-deterministic, right? So I think the argument I am
 trying to construct is that the two things the user cares about are the
 time to block and the time to ack and any other timeout we use internally
 is basically an implementation detail of ensuring this.
 
 Your point about the difference between batches and requests is a good
 one.
 I hadn't thought of that. So to make my proposal  work we would need to do
 something like base the request time off the oldest batch. Let me think
 about the implications of that, it's definitely a problem.
 
 -Jay
 
 On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid
 
 wrote:
 
  Hey Jay,
 
  That is also a viable solution.
 
  I think the main purpose is to let user know how long they can block,
  which is important.
 
  I have some question over the proposal, though. Will user still need to
  send linger.ms? Will request timeout cover linger.ms as well?
  My concern of letting request timeout also cover the time spent in
  accumulator is that this will result in the actually request timeout
  indeterministic.
  Also, implementation wise, a request can have multiple batches, the time
  spent in the accumulator could vary a lot. If one of the batch times
 out,
  what should we do the the rest of the batches?
  I think we probably want to separate batch timeout and request timeout.
 
  Maybe we can do this:
  Max.send.block.ms
  Request.timeout
  Batch.timeout
  Replication.timeout
 
  So in send() we use max.send.block.ms only. In accumulator, we use
  batch.timeout, in NetWorkClient, we use request.timeout. Replication
  timeout is needed anyway.
 
  This looks more understandable from what I can see.
 
  What do 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Jiangjie Qin
Hey Mayuresh,

I think our purpose is to find out the simplest user interface but still
support all the flexibilities user might need.
In that sense, we have to expose
1. linger.ms for batching purpose,
2. request timeout to support different RTT.
3. Some blocking timeout for send()
Etc,..

So having one single timeout might not meet the flexibilities we want to
provide to users.

I’ll revise the wiki and maybe we can go from there.

Thanks.

Jiangjie (Becket) Qin

On 5/19/15, 12:51 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote:

IMO, having 4 different timeouts makes it confusing for the user and it
requires the client to understand the internals of kafka. We should have a
single timeout from the users perspective and handle other timeouts
internally like a batch timeout.

Mayuresh

On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Jay,

 That is also a viable solution.

 I think the main purpose is to let user know how long they can block,
 which is important.

 I have some question over the proposal, though. Will user still need to
 send linger.ms? Will request timeout cover linger.ms as well?
 My concern of letting request timeout also cover the time spent in
 accumulator is that this will result in the actually request timeout
 indeterministic.
 Also, implementation wise, a request can have multiple batches, the time
 spent in the accumulator could vary a lot. If one of the batch times
out,
 what should we do the the rest of the batches?
 I think we probably want to separate batch timeout and request timeout.

 Maybe we can do this:
 Max.send.block.ms
 Request.timeout
 Batch.timeout
 Replication.timeout

 So in send() we use max.send.block.ms only. In accumulator, we use
 batch.timeout, in NetWorkClient, we use request.timeout. Replication
 timeout is needed anyway.

 This looks more understandable from what I can see.

 What do you think?

 Jiangjie (Becket) Qin

 On 5/19/15, 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote:

 So the alternative to consider would be to instead have
max.block.ms (or something)
request.timeout
replication.timeout
 
 I think this better captures what the user cares about. Here is how it
 would work.
 
 *max.send.block.ms http://max.send.block.ms* is the bound on the
 maximum
 time the producer.send() call can block.
 This subsumes the existing metadata timeout use case but not the
proposed
 use for the time in the accumulator. It *also* acts as a bound on the
time
 you can block on BufferPool allocation (we'd have to add this but that
 should be easy).
 
 *request.timeout* is the bound on the time after send() complete until
you
 get an acknowledgement. This covers the connection timeout, and the
time
 in
 the accumulator. So to implement this, the time we set in the request
sent
 via NetworkClient would have already subtracted off the time spent in
the
 accumulator, and if the request retried we would include both the time
in
 the accumulator an the time taken for the first request, etc. In other
 words this is the upper bound on the time to the Future being
satisfied.
 
 *replication.timeout* will default to something reasonable but maybe
you
 can override it if you want?
 
 Thoughts?
 
 -Jay
 
 On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat 
 gharatmayures...@gmail.com wrote:
 
  So what I understand is that, we would have 3 time outs :
  1) replication timeout
  2) request timeout
  3) metadata timeout (existing)
 
  The request timeout has to be greater than the replication timeout.
  request timeout is for messages already sent to kafka and the
producer
 is
  waiting for them.
 
  Thanks,
 
  Mayuresh
 
  On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com
 wrote:
 
   I think this looks good. What I think is missing is an overview of
the
   timeouts from the user's perspective.
  
   My worry is that it is quite complicated to reason about the
current
 set
  of
   timeouts. Currently we have
  timeout.ms
  metadata.fetch.timeout.ms
  
   The proposed settings I think are:
 batch.expiration.ms
   request.timeout.ms
   replication.timeout.ms
  
   I think maybe we can skip the batch.expiration.ms. Instead maybe we
 can
   somehow combine these into a single request timeout so that we
 subtract
  the
   time you spent waiting from the request timeout and/or replication
  timeout
   somehow? I don't have an explicit proposal but my suspicion is that
 from
   the user's point of view there is just one timeout related to the
 request
   after which they don't care, and we can split that up between the
 batch
   time and the request time. Thoughts?
  
   How are we handling connection timeouts? If a machine hard fails in
 the
   middle of connection establishment there will be no outstanding
  requests. I
   think this may be okay because connections are established when we
 want
  to
   send a request and presumably we will begin the timer then?
  
   To that end I suggest we do two things:

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Jiangjie Qin
Hi Jay,

I updated what I think int KIP wiki. Just a short summary here. Because we
need timeout for: 
1. Send()
2. Batches in accumulator
3. Requests in flight.
That means we need to have at least three configurations if we do not
reuse configurations.

I think we probably want to also separate the configurations for exception
handling and SLA purposes as well.
My understanding of the configurations we are discussing here is they are
for exception handling but not for SLA purposes. It looks to me that
exception handling is more component oriented while SLA is more of
systematic tuning. What you suggested sounds more like to set
configurations to meet a user defined SLA. I am not sure if this is the
things we want to do here.

Thanks.

Jiangjie (Becket) Qin

On 5/19/15, 5:42 PM, Jay Kreps jay.kr...@gmail.com wrote:

Yeah I think linger.ms remains separate, setting that is a performance
optimization rather than failure handling thing. We should ideally sanity
check this, though, in my proposal, since if they set linger.ms 
request.timeout then that won't work.

It's true that in my proposal that the actual replication timeout we set
on
the request would be non-deterministic. However the flip side of that
argument is that in the existing proposal the actual time until an
acknowledgement is non-deterministic, right? So I think the argument I am
trying to construct is that the two things the user cares about are the
time to block and the time to ack and any other timeout we use internally
is basically an implementation detail of ensuring this.

Your point about the difference between batches and requests is a good
one.
I hadn't thought of that. So to make my proposal  work we would need to do
something like base the request time off the oldest batch. Let me think
about the implications of that, it's definitely a problem.

-Jay

On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Jay,

 That is also a viable solution.

 I think the main purpose is to let user know how long they can block,
 which is important.

 I have some question over the proposal, though. Will user still need to
 send linger.ms? Will request timeout cover linger.ms as well?
 My concern of letting request timeout also cover the time spent in
 accumulator is that this will result in the actually request timeout
 indeterministic.
 Also, implementation wise, a request can have multiple batches, the time
 spent in the accumulator could vary a lot. If one of the batch times
out,
 what should we do the the rest of the batches?
 I think we probably want to separate batch timeout and request timeout.

 Maybe we can do this:
 Max.send.block.ms
 Request.timeout
 Batch.timeout
 Replication.timeout

 So in send() we use max.send.block.ms only. In accumulator, we use
 batch.timeout, in NetWorkClient, we use request.timeout. Replication
 timeout is needed anyway.

 This looks more understandable from what I can see.

 What do you think?

 Jiangjie (Becket) Qin

 On 5/19/15, 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote:

 So the alternative to consider would be to instead have
max.block.ms (or something)
request.timeout
replication.timeout
 
 I think this better captures what the user cares about. Here is how it
 would work.
 
 *max.send.block.ms http://max.send.block.ms* is the bound on the
 maximum
 time the producer.send() call can block.
 This subsumes the existing metadata timeout use case but not the
proposed
 use for the time in the accumulator. It *also* acts as a bound on the
time
 you can block on BufferPool allocation (we'd have to add this but that
 should be easy).
 
 *request.timeout* is the bound on the time after send() complete until
you
 get an acknowledgement. This covers the connection timeout, and the
time
 in
 the accumulator. So to implement this, the time we set in the request
sent
 via NetworkClient would have already subtracted off the time spent in
the
 accumulator, and if the request retried we would include both the time
in
 the accumulator an the time taken for the first request, etc. In other
 words this is the upper bound on the time to the Future being
satisfied.
 
 *replication.timeout* will default to something reasonable but maybe
you
 can override it if you want?
 
 Thoughts?
 
 -Jay
 
 On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat 
 gharatmayures...@gmail.com wrote:
 
  So what I understand is that, we would have 3 time outs :
  1) replication timeout
  2) request timeout
  3) metadata timeout (existing)
 
  The request timeout has to be greater than the replication timeout.
  request timeout is for messages already sent to kafka and the
producer
 is
  waiting for them.
 
  Thanks,
 
  Mayuresh
 
  On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com
 wrote:
 
   I think this looks good. What I think is missing is an overview of
the
   timeouts from the user's perspective.
  
   My worry is that it is quite complicated to reason about the
current
 set
  of
  

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Mayuresh Gharat
Hi Jay,

If I am understanding this correctly, we should treat the Batch Timeout
separately from the Request Timeout. Request time out can be used only for
inflight request (request that have been sent to kafka brokers and waiting
for response) and Batch Timeout can be used to expire the batches in
accumalator and return a failure to the client.


Mayuresh



On Tue, May 19, 2015 at 5:42 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Yeah I think linger.ms remains separate, setting that is a performance
 optimization rather than failure handling thing. We should ideally sanity
 check this, though, in my proposal, since if they set linger.ms 
 request.timeout then that won't work.

 It's true that in my proposal that the actual replication timeout we set on
 the request would be non-deterministic. However the flip side of that
 argument is that in the existing proposal the actual time until an
 acknowledgement is non-deterministic, right? So I think the argument I am
 trying to construct is that the two things the user cares about are the
 time to block and the time to ack and any other timeout we use internally
 is basically an implementation detail of ensuring this.

 Your point about the difference between batches and requests is a good one.
 I hadn't thought of that. So to make my proposal  work we would need to do
 something like base the request time off the oldest batch. Let me think
 about the implications of that, it's definitely a problem.

 -Jay

 On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  Hey Jay,
 
  That is also a viable solution.
 
  I think the main purpose is to let user know how long they can block,
  which is important.
 
  I have some question over the proposal, though. Will user still need to
  send linger.ms? Will request timeout cover linger.ms as well?
  My concern of letting request timeout also cover the time spent in
  accumulator is that this will result in the actually request timeout
  indeterministic.
  Also, implementation wise, a request can have multiple batches, the time
  spent in the accumulator could vary a lot. If one of the batch times out,
  what should we do the the rest of the batches?
  I think we probably want to separate batch timeout and request timeout.
 
  Maybe we can do this:
  Max.send.block.ms
  Request.timeout
  Batch.timeout
  Replication.timeout
 
  So in send() we use max.send.block.ms only. In accumulator, we use
  batch.timeout, in NetWorkClient, we use request.timeout. Replication
  timeout is needed anyway.
 
  This looks more understandable from what I can see.
 
  What do you think?
 
  Jiangjie (Becket) Qin
 
  On 5/19/15, 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
  So the alternative to consider would be to instead have
 max.block.ms (or something)
 request.timeout
 replication.timeout
  
  I think this better captures what the user cares about. Here is how it
  would work.
  
  *max.send.block.ms http://max.send.block.ms* is the bound on the
  maximum
  time the producer.send() call can block.
  This subsumes the existing metadata timeout use case but not the
 proposed
  use for the time in the accumulator. It *also* acts as a bound on the
 time
  you can block on BufferPool allocation (we'd have to add this but that
  should be easy).
  
  *request.timeout* is the bound on the time after send() complete until
 you
  get an acknowledgement. This covers the connection timeout, and the time
  in
  the accumulator. So to implement this, the time we set in the request
 sent
  via NetworkClient would have already subtracted off the time spent in
 the
  accumulator, and if the request retried we would include both the time
 in
  the accumulator an the time taken for the first request, etc. In other
  words this is the upper bound on the time to the Future being satisfied.
  
  *replication.timeout* will default to something reasonable but maybe you
  can override it if you want?
  
  Thoughts?
  
  -Jay
  
  On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat 
  gharatmayures...@gmail.com wrote:
  
   So what I understand is that, we would have 3 time outs :
   1) replication timeout
   2) request timeout
   3) metadata timeout (existing)
  
   The request timeout has to be greater than the replication timeout.
   request timeout is for messages already sent to kafka and the producer
  is
   waiting for them.
  
   Thanks,
  
   Mayuresh
  
   On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com
  wrote:
  
I think this looks good. What I think is missing is an overview of
 the
timeouts from the user's perspective.
   
My worry is that it is quite complicated to reason about the current
  set
   of
timeouts. Currently we have
   timeout.ms
   metadata.fetch.timeout.ms
   
The proposed settings I think are:
  batch.expiration.ms
request.timeout.ms
replication.timeout.ms
   
I think maybe we can skip the batch.expiration.ms. Instead maybe we
 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Jay Kreps
Yeah I think linger.ms remains separate, setting that is a performance
optimization rather than failure handling thing. We should ideally sanity
check this, though, in my proposal, since if they set linger.ms 
request.timeout then that won't work.

It's true that in my proposal that the actual replication timeout we set on
the request would be non-deterministic. However the flip side of that
argument is that in the existing proposal the actual time until an
acknowledgement is non-deterministic, right? So I think the argument I am
trying to construct is that the two things the user cares about are the
time to block and the time to ack and any other timeout we use internally
is basically an implementation detail of ensuring this.

Your point about the difference between batches and requests is a good one.
I hadn't thought of that. So to make my proposal  work we would need to do
something like base the request time off the oldest batch. Let me think
about the implications of that, it's definitely a problem.

-Jay

On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Jay,

 That is also a viable solution.

 I think the main purpose is to let user know how long they can block,
 which is important.

 I have some question over the proposal, though. Will user still need to
 send linger.ms? Will request timeout cover linger.ms as well?
 My concern of letting request timeout also cover the time spent in
 accumulator is that this will result in the actually request timeout
 indeterministic.
 Also, implementation wise, a request can have multiple batches, the time
 spent in the accumulator could vary a lot. If one of the batch times out,
 what should we do the the rest of the batches?
 I think we probably want to separate batch timeout and request timeout.

 Maybe we can do this:
 Max.send.block.ms
 Request.timeout
 Batch.timeout
 Replication.timeout

 So in send() we use max.send.block.ms only. In accumulator, we use
 batch.timeout, in NetWorkClient, we use request.timeout. Replication
 timeout is needed anyway.

 This looks more understandable from what I can see.

 What do you think?

 Jiangjie (Becket) Qin

 On 5/19/15, 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote:

 So the alternative to consider would be to instead have
max.block.ms (or something)
request.timeout
replication.timeout
 
 I think this better captures what the user cares about. Here is how it
 would work.
 
 *max.send.block.ms http://max.send.block.ms* is the bound on the
 maximum
 time the producer.send() call can block.
 This subsumes the existing metadata timeout use case but not the proposed
 use for the time in the accumulator. It *also* acts as a bound on the time
 you can block on BufferPool allocation (we'd have to add this but that
 should be easy).
 
 *request.timeout* is the bound on the time after send() complete until you
 get an acknowledgement. This covers the connection timeout, and the time
 in
 the accumulator. So to implement this, the time we set in the request sent
 via NetworkClient would have already subtracted off the time spent in the
 accumulator, and if the request retried we would include both the time in
 the accumulator an the time taken for the first request, etc. In other
 words this is the upper bound on the time to the Future being satisfied.
 
 *replication.timeout* will default to something reasonable but maybe you
 can override it if you want?
 
 Thoughts?
 
 -Jay
 
 On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat 
 gharatmayures...@gmail.com wrote:
 
  So what I understand is that, we would have 3 time outs :
  1) replication timeout
  2) request timeout
  3) metadata timeout (existing)
 
  The request timeout has to be greater than the replication timeout.
  request timeout is for messages already sent to kafka and the producer
 is
  waiting for them.
 
  Thanks,
 
  Mayuresh
 
  On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com
 wrote:
 
   I think this looks good. What I think is missing is an overview of the
   timeouts from the user's perspective.
  
   My worry is that it is quite complicated to reason about the current
 set
  of
   timeouts. Currently we have
  timeout.ms
  metadata.fetch.timeout.ms
  
   The proposed settings I think are:
 batch.expiration.ms
   request.timeout.ms
   replication.timeout.ms
  
   I think maybe we can skip the batch.expiration.ms. Instead maybe we
 can
   somehow combine these into a single request timeout so that we
 subtract
  the
   time you spent waiting from the request timeout and/or replication
  timeout
   somehow? I don't have an explicit proposal but my suspicion is that
 from
   the user's point of view there is just one timeout related to the
 request
   after which they don't care, and we can split that up between the
 batch
   time and the request time. Thoughts?
  
   How are we handling connection timeouts? If a machine hard fails in
 the
   middle of connection establishment there 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Mayuresh Gharat
So what I understand is that, we would have 3 time outs :
1) replication timeout
2) request timeout
3) metadata timeout (existing)

The request timeout has to be greater than the replication timeout.
request timeout is for messages already sent to kafka and the producer is
waiting for them.

Thanks,

Mayuresh

On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote:

 I think this looks good. What I think is missing is an overview of the
 timeouts from the user's perspective.

 My worry is that it is quite complicated to reason about the current set of
 timeouts. Currently we have
timeout.ms
metadata.fetch.timeout.ms

 The proposed settings I think are:
   batch.expiration.ms
 request.timeout.ms
 replication.timeout.ms

 I think maybe we can skip the batch.expiration.ms. Instead maybe we can
 somehow combine these into a single request timeout so that we subtract the
 time you spent waiting from the request timeout and/or replication timeout
 somehow? I don't have an explicit proposal but my suspicion is that from
 the user's point of view there is just one timeout related to the request
 after which they don't care, and we can split that up between the batch
 time and the request time. Thoughts?

 How are we handling connection timeouts? If a machine hard fails in the
 middle of connection establishment there will be no outstanding requests. I
 think this may be okay because connections are established when we want to
 send a request and presumably we will begin the timer then?

 To that end I suggest we do two things:
 1. Include KAKFA-1788. I know that technically these two things are
 different but from the user's point of view they aren't.
 2. Include in the KIP the explanation to the user of the full set of
 timeouts, what they mean, how we will default them, and when to override
 which.

 I know this is a hassle but I think the end experience will be a lot better
 if we go through this thought process.

 -Jay

 On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  I modified the WIKI page to incorporate the feedbacks from mailing list
  and KIP hangout.
 
  - Added the deprecation plan for TIMEOUT_CONFIG
  - Added the actions to take after request timeout
 
  I finally chose to create a new connection if requests timeout. The
 reason
  is:
  1. In most cases, if a broker is just slow, as long as we set request
  timeout to be a reasonable value, we should not see many new connections
  get created.
  2. If a broker is down, hopefully metadata refresh will find the new
  broker and we will not try to reconnect to the broker anymore.
 
  Comments are welcome!
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
  On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com
 wrote:
 
  +1 Becket. That would give enough time for clients to move. We should
 make
  this change very clear.
  
  Thanks,
  
  Mayuresh
  
  On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid
 
  wrote:
  
   Hey Ewen,
  
   Very good summary about the compatibility. What you proposed makes
  sense.
   So basically we can do the following:
  
   In next release, i.e. 0.8.3:
   1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”)
   2. Mark TIMEOUT_CONFIG as deprecated
   3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is
   defined and give a warning about deprecation.
   In the release after 0.8.3, we remove TIMEOUT_CONFIG.
  
   This should give enough buffer for this change.
  
   Request timeout is a complete new thing we add to fix a bug, I’m with
  you
   it does not make sense to have it maintain the old buggy behavior. So
 we
   can set it to a reasonable value instead of infinite.
  
   Jiangjie (Becket) Qin
  
   On 5/12/15, 12:03 PM, Ewen Cheslack-Postava e...@confluent.io
  wrote:
  
   I think my confusion is coming from this:
   
So in this KIP, we only address (3). The only public interface
 change
   is a
new configuration of request timeout (and maybe change the
  configuration
name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
   
   There are 3 possible compatibility issues I see here:
   
   * I assumed this meant the constants also change, so timeout.ms
  becomes
   
   replication.timeout.ms. This breaks config files that worked on the
   previous version and the only warning would be in release notes. We
 do
   warn
   about unused configs so they might notice the problem.
   
   * Binary and source compatibility if someone configures their client
 in
   code and uses the TIMEOUT_CONFIG variable. Renaming it will cause
  existing
   jars to break if you try to run against an updated client (which
 seems
  not
   very significant since I doubt people upgrade these without
 recompiling
   but
   maybe I'm wrong about that). And it breaks builds without have
  deprecated
   that field first, which again, is probably not the biggest issue but
 is
   annoying for users and when we accidentally changed the 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Jay Kreps
Here is the concern I had with reusing the metadata.fetch.timeout.ms:

Previously people were using this as a bound on the time send() would
block. It isn't a bound on the time we will wait on a metadata request,
just the time the send call will block if metadata is missing for the
topic. We told people who wanted a guarantee of no blocking to basically
preinitialize metadata and set this timeout to 0. However I think now this
will have a slightly different side effect which is to kill any request
immediately for a leaderless partition even though that request is safely
buffered in the record accumulator and no blocking will occur. People using
the setting in the original way would now get a bit of a surprise.

This may actually be okay and there is always a tradeoff between simplicity
and control.

-Jay

On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote:

 I think this looks good. What I think is missing is an overview of the
 timeouts from the user's perspective.

 My worry is that it is quite complicated to reason about the current set
 of timeouts. Currently we have
timeout.ms
metadata.fetch.timeout.ms

 The proposed settings I think are:
   batch.expiration.ms
 request.timeout.ms
 replication.timeout.ms

 I think maybe we can skip the batch.expiration.ms. Instead maybe we can
 somehow combine these into a single request timeout so that we subtract the
 time you spent waiting from the request timeout and/or replication timeout
 somehow? I don't have an explicit proposal but my suspicion is that from
 the user's point of view there is just one timeout related to the request
 after which they don't care, and we can split that up between the batch
 time and the request time. Thoughts?

 How are we handling connection timeouts? If a machine hard fails in the
 middle of connection establishment there will be no outstanding requests. I
 think this may be okay because connections are established when we want to
 send a request and presumably we will begin the timer then?

 To that end I suggest we do two things:
 1. Include KAKFA-1788. I know that technically these two things are
 different but from the user's point of view they aren't.
 2. Include in the KIP the explanation to the user of the full set of
 timeouts, what they mean, how we will default them, and when to override
 which.

 I know this is a hassle but I think the end experience will be a lot
 better if we go through this thought process.

 -Jay

 On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

 I modified the WIKI page to incorporate the feedbacks from mailing list
 and KIP hangout.

 - Added the deprecation plan for TIMEOUT_CONFIG
 - Added the actions to take after request timeout

 I finally chose to create a new connection if requests timeout. The reason
 is:
 1. In most cases, if a broker is just slow, as long as we set request
 timeout to be a reasonable value, we should not see many new connections
 get created.
 2. If a broker is down, hopefully metadata refresh will find the new
 broker and we will not try to reconnect to the broker anymore.

 Comments are welcome!

 Thanks.

 Jiangjie (Becket) Qin

 On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com
 wrote:

 +1 Becket. That would give enough time for clients to move. We should
 make
 this change very clear.
 
 Thanks,
 
 Mayuresh
 
 On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid
 
 wrote:
 
  Hey Ewen,
 
  Very good summary about the compatibility. What you proposed makes
 sense.
  So basically we can do the following:
 
  In next release, i.e. 0.8.3:
  1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”)
  2. Mark TIMEOUT_CONFIG as deprecated
  3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is
  defined and give a warning about deprecation.
  In the release after 0.8.3, we remove TIMEOUT_CONFIG.
 
  This should give enough buffer for this change.
 
  Request timeout is a complete new thing we add to fix a bug, I’m with
 you
  it does not make sense to have it maintain the old buggy behavior. So
 we
  can set it to a reasonable value instead of infinite.
 
  Jiangjie (Becket) Qin
 
  On 5/12/15, 12:03 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
 
  I think my confusion is coming from this:
  
   So in this KIP, we only address (3). The only public interface
 change
  is a
   new configuration of request timeout (and maybe change the
 configuration
   name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
  
  There are 3 possible compatibility issues I see here:
  
  * I assumed this meant the constants also change, so timeout.ms
 becomes
  
  replication.timeout.ms. This breaks config files that worked on the
  previous version and the only warning would be in release notes. We do
  warn
  about unused configs so they might notice the problem.
  
  * Binary and source compatibility if someone configures their client
 in
  code and uses the 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Jay Kreps
So the alternative to consider would be to instead have
   max.block.ms (or something)
   request.timeout
   replication.timeout

I think this better captures what the user cares about. Here is how it
would work.

*max.send.block.ms http://max.send.block.ms* is the bound on the maximum
time the producer.send() call can block.
This subsumes the existing metadata timeout use case but not the proposed
use for the time in the accumulator. It *also* acts as a bound on the time
you can block on BufferPool allocation (we'd have to add this but that
should be easy).

*request.timeout* is the bound on the time after send() complete until you
get an acknowledgement. This covers the connection timeout, and the time in
the accumulator. So to implement this, the time we set in the request sent
via NetworkClient would have already subtracted off the time spent in the
accumulator, and if the request retried we would include both the time in
the accumulator an the time taken for the first request, etc. In other
words this is the upper bound on the time to the Future being satisfied.

*replication.timeout* will default to something reasonable but maybe you
can override it if you want?

Thoughts?

-Jay

On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat 
gharatmayures...@gmail.com wrote:

 So what I understand is that, we would have 3 time outs :
 1) replication timeout
 2) request timeout
 3) metadata timeout (existing)

 The request timeout has to be greater than the replication timeout.
 request timeout is for messages already sent to kafka and the producer is
 waiting for them.

 Thanks,

 Mayuresh

 On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote:

  I think this looks good. What I think is missing is an overview of the
  timeouts from the user's perspective.
 
  My worry is that it is quite complicated to reason about the current set
 of
  timeouts. Currently we have
 timeout.ms
 metadata.fetch.timeout.ms
 
  The proposed settings I think are:
batch.expiration.ms
  request.timeout.ms
  replication.timeout.ms
 
  I think maybe we can skip the batch.expiration.ms. Instead maybe we can
  somehow combine these into a single request timeout so that we subtract
 the
  time you spent waiting from the request timeout and/or replication
 timeout
  somehow? I don't have an explicit proposal but my suspicion is that from
  the user's point of view there is just one timeout related to the request
  after which they don't care, and we can split that up between the batch
  time and the request time. Thoughts?
 
  How are we handling connection timeouts? If a machine hard fails in the
  middle of connection establishment there will be no outstanding
 requests. I
  think this may be okay because connections are established when we want
 to
  send a request and presumably we will begin the timer then?
 
  To that end I suggest we do two things:
  1. Include KAKFA-1788. I know that technically these two things are
  different but from the user's point of view they aren't.
  2. Include in the KIP the explanation to the user of the full set of
  timeouts, what they mean, how we will default them, and when to override
  which.
 
  I know this is a hassle but I think the end experience will be a lot
 better
  if we go through this thought process.
 
  -Jay
 
  On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid
 
  wrote:
 
   I modified the WIKI page to incorporate the feedbacks from mailing list
   and KIP hangout.
  
   - Added the deprecation plan for TIMEOUT_CONFIG
   - Added the actions to take after request timeout
  
   I finally chose to create a new connection if requests timeout. The
  reason
   is:
   1. In most cases, if a broker is just slow, as long as we set request
   timeout to be a reasonable value, we should not see many new
 connections
   get created.
   2. If a broker is down, hopefully metadata refresh will find the new
   broker and we will not try to reconnect to the broker anymore.
  
   Comments are welcome!
  
   Thanks.
  
   Jiangjie (Becket) Qin
  
   On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com
  wrote:
  
   +1 Becket. That would give enough time for clients to move. We should
  make
   this change very clear.
   
   Thanks,
   
   Mayuresh
   
   On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
   wrote:
   
Hey Ewen,
   
Very good summary about the compatibility. What you proposed makes
   sense.
So basically we can do the following:
   
In next release, i.e. 0.8.3:
1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”)
2. Mark TIMEOUT_CONFIG as deprecated
3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is
defined and give a warning about deprecation.
In the release after 0.8.3, we remove TIMEOUT_CONFIG.
   
This should give enough buffer for this change.
   
Request timeout is a complete new thing we add to fix a bug, I’m
 with
   you

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Jiangjie Qin
Hey Jay,

That is also a viable solution.

I think the main purpose is to let user know how long they can block,
which is important.

I have some question over the proposal, though. Will user still need to
send linger.ms? Will request timeout cover linger.ms as well?
My concern of letting request timeout also cover the time spent in
accumulator is that this will result in the actually request timeout
indeterministic.
Also, implementation wise, a request can have multiple batches, the time
spent in the accumulator could vary a lot. If one of the batch times out,
what should we do the the rest of the batches?
I think we probably want to separate batch timeout and request timeout.

Maybe we can do this:
Max.send.block.ms
Request.timeout
Batch.timeout
Replication.timeout

So in send() we use max.send.block.ms only. In accumulator, we use
batch.timeout, in NetWorkClient, we use request.timeout. Replication
timeout is needed anyway.

This looks more understandable from what I can see.

What do you think?

Jiangjie (Becket) Qin

On 5/19/15, 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote:

So the alternative to consider would be to instead have
   max.block.ms (or something)
   request.timeout
   replication.timeout

I think this better captures what the user cares about. Here is how it
would work.

*max.send.block.ms http://max.send.block.ms* is the bound on the maximum
time the producer.send() call can block.
This subsumes the existing metadata timeout use case but not the proposed
use for the time in the accumulator. It *also* acts as a bound on the time
you can block on BufferPool allocation (we'd have to add this but that
should be easy).

*request.timeout* is the bound on the time after send() complete until you
get an acknowledgement. This covers the connection timeout, and the time
in
the accumulator. So to implement this, the time we set in the request sent
via NetworkClient would have already subtracted off the time spent in the
accumulator, and if the request retried we would include both the time in
the accumulator an the time taken for the first request, etc. In other
words this is the upper bound on the time to the Future being satisfied.

*replication.timeout* will default to something reasonable but maybe you
can override it if you want?

Thoughts?

-Jay

On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat 
gharatmayures...@gmail.com wrote:

 So what I understand is that, we would have 3 time outs :
 1) replication timeout
 2) request timeout
 3) metadata timeout (existing)

 The request timeout has to be greater than the replication timeout.
 request timeout is for messages already sent to kafka and the producer
is
 waiting for them.

 Thanks,

 Mayuresh

 On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote:

  I think this looks good. What I think is missing is an overview of the
  timeouts from the user's perspective.
 
  My worry is that it is quite complicated to reason about the current
set
 of
  timeouts. Currently we have
 timeout.ms
 metadata.fetch.timeout.ms
 
  The proposed settings I think are:
batch.expiration.ms
  request.timeout.ms
  replication.timeout.ms
 
  I think maybe we can skip the batch.expiration.ms. Instead maybe we
can
  somehow combine these into a single request timeout so that we
subtract
 the
  time you spent waiting from the request timeout and/or replication
 timeout
  somehow? I don't have an explicit proposal but my suspicion is that
from
  the user's point of view there is just one timeout related to the
request
  after which they don't care, and we can split that up between the
batch
  time and the request time. Thoughts?
 
  How are we handling connection timeouts? If a machine hard fails in
the
  middle of connection establishment there will be no outstanding
 requests. I
  think this may be okay because connections are established when we
want
 to
  send a request and presumably we will begin the timer then?
 
  To that end I suggest we do two things:
  1. Include KAKFA-1788. I know that technically these two things are
  different but from the user's point of view they aren't.
  2. Include in the KIP the explanation to the user of the full set of
  timeouts, what they mean, how we will default them, and when to
override
  which.
 
  I know this is a hassle but I think the end experience will be a lot
 better
  if we go through this thought process.
 
  -Jay
 
  On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin
j...@linkedin.com.invalid
 
  wrote:
 
   I modified the WIKI page to incorporate the feedbacks from mailing
list
   and KIP hangout.
  
   - Added the deprecation plan for TIMEOUT_CONFIG
   - Added the actions to take after request timeout
  
   I finally chose to create a new connection if requests timeout. The
  reason
   is:
   1. In most cases, if a broker is just slow, as long as we set
request
   timeout to be a reasonable value, we should not see many new
 connections
   get created.
   2. If a broker is down, hopefully 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Mayuresh Gharat
IMO, having 4 different timeouts makes it confusing for the user and it
requires the client to understand the internals of kafka. We should have a
single timeout from the users perspective and handle other timeouts
internally like a batch timeout.

Mayuresh

On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Jay,

 That is also a viable solution.

 I think the main purpose is to let user know how long they can block,
 which is important.

 I have some question over the proposal, though. Will user still need to
 send linger.ms? Will request timeout cover linger.ms as well?
 My concern of letting request timeout also cover the time spent in
 accumulator is that this will result in the actually request timeout
 indeterministic.
 Also, implementation wise, a request can have multiple batches, the time
 spent in the accumulator could vary a lot. If one of the batch times out,
 what should we do the the rest of the batches?
 I think we probably want to separate batch timeout and request timeout.

 Maybe we can do this:
 Max.send.block.ms
 Request.timeout
 Batch.timeout
 Replication.timeout

 So in send() we use max.send.block.ms only. In accumulator, we use
 batch.timeout, in NetWorkClient, we use request.timeout. Replication
 timeout is needed anyway.

 This looks more understandable from what I can see.

 What do you think?

 Jiangjie (Becket) Qin

 On 5/19/15, 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote:

 So the alternative to consider would be to instead have
max.block.ms (or something)
request.timeout
replication.timeout
 
 I think this better captures what the user cares about. Here is how it
 would work.
 
 *max.send.block.ms http://max.send.block.ms* is the bound on the
 maximum
 time the producer.send() call can block.
 This subsumes the existing metadata timeout use case but not the proposed
 use for the time in the accumulator. It *also* acts as a bound on the time
 you can block on BufferPool allocation (we'd have to add this but that
 should be easy).
 
 *request.timeout* is the bound on the time after send() complete until you
 get an acknowledgement. This covers the connection timeout, and the time
 in
 the accumulator. So to implement this, the time we set in the request sent
 via NetworkClient would have already subtracted off the time spent in the
 accumulator, and if the request retried we would include both the time in
 the accumulator an the time taken for the first request, etc. In other
 words this is the upper bound on the time to the Future being satisfied.
 
 *replication.timeout* will default to something reasonable but maybe you
 can override it if you want?
 
 Thoughts?
 
 -Jay
 
 On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat 
 gharatmayures...@gmail.com wrote:
 
  So what I understand is that, we would have 3 time outs :
  1) replication timeout
  2) request timeout
  3) metadata timeout (existing)
 
  The request timeout has to be greater than the replication timeout.
  request timeout is for messages already sent to kafka and the producer
 is
  waiting for them.
 
  Thanks,
 
  Mayuresh
 
  On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com
 wrote:
 
   I think this looks good. What I think is missing is an overview of the
   timeouts from the user's perspective.
  
   My worry is that it is quite complicated to reason about the current
 set
  of
   timeouts. Currently we have
  timeout.ms
  metadata.fetch.timeout.ms
  
   The proposed settings I think are:
 batch.expiration.ms
   request.timeout.ms
   replication.timeout.ms
  
   I think maybe we can skip the batch.expiration.ms. Instead maybe we
 can
   somehow combine these into a single request timeout so that we
 subtract
  the
   time you spent waiting from the request timeout and/or replication
  timeout
   somehow? I don't have an explicit proposal but my suspicion is that
 from
   the user's point of view there is just one timeout related to the
 request
   after which they don't care, and we can split that up between the
 batch
   time and the request time. Thoughts?
  
   How are we handling connection timeouts? If a machine hard fails in
 the
   middle of connection establishment there will be no outstanding
  requests. I
   think this may be okay because connections are established when we
 want
  to
   send a request and presumably we will begin the timer then?
  
   To that end I suggest we do two things:
   1. Include KAKFA-1788. I know that technically these two things are
   different but from the user's point of view they aren't.
   2. Include in the KIP the explanation to the user of the full set of
   timeouts, what they mean, how we will default them, and when to
 override
   which.
  
   I know this is a hassle but I think the end experience will be a lot
  better
   if we go through this thought process.
  
   -Jay
  
   On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
   wrote:
  
I modified the 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Jay Kreps
I think this looks good. What I think is missing is an overview of the
timeouts from the user's perspective.

My worry is that it is quite complicated to reason about the current set of
timeouts. Currently we have
   timeout.ms
   metadata.fetch.timeout.ms

The proposed settings I think are:
  batch.expiration.ms
request.timeout.ms
replication.timeout.ms

I think maybe we can skip the batch.expiration.ms. Instead maybe we can
somehow combine these into a single request timeout so that we subtract the
time you spent waiting from the request timeout and/or replication timeout
somehow? I don't have an explicit proposal but my suspicion is that from
the user's point of view there is just one timeout related to the request
after which they don't care, and we can split that up between the batch
time and the request time. Thoughts?

How are we handling connection timeouts? If a machine hard fails in the
middle of connection establishment there will be no outstanding requests. I
think this may be okay because connections are established when we want to
send a request and presumably we will begin the timer then?

To that end I suggest we do two things:
1. Include KAKFA-1788. I know that technically these two things are
different but from the user's point of view they aren't.
2. Include in the KIP the explanation to the user of the full set of
timeouts, what they mean, how we will default them, and when to override
which.

I know this is a hassle but I think the end experience will be a lot better
if we go through this thought process.

-Jay

On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I modified the WIKI page to incorporate the feedbacks from mailing list
 and KIP hangout.

 - Added the deprecation plan for TIMEOUT_CONFIG
 - Added the actions to take after request timeout

 I finally chose to create a new connection if requests timeout. The reason
 is:
 1. In most cases, if a broker is just slow, as long as we set request
 timeout to be a reasonable value, we should not see many new connections
 get created.
 2. If a broker is down, hopefully metadata refresh will find the new
 broker and we will not try to reconnect to the broker anymore.

 Comments are welcome!

 Thanks.

 Jiangjie (Becket) Qin

 On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote:

 +1 Becket. That would give enough time for clients to move. We should make
 this change very clear.
 
 Thanks,
 
 Mayuresh
 
 On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
  Hey Ewen,
 
  Very good summary about the compatibility. What you proposed makes
 sense.
  So basically we can do the following:
 
  In next release, i.e. 0.8.3:
  1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”)
  2. Mark TIMEOUT_CONFIG as deprecated
  3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is
  defined and give a warning about deprecation.
  In the release after 0.8.3, we remove TIMEOUT_CONFIG.
 
  This should give enough buffer for this change.
 
  Request timeout is a complete new thing we add to fix a bug, I’m with
 you
  it does not make sense to have it maintain the old buggy behavior. So we
  can set it to a reasonable value instead of infinite.
 
  Jiangjie (Becket) Qin
 
  On 5/12/15, 12:03 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
 
  I think my confusion is coming from this:
  
   So in this KIP, we only address (3). The only public interface change
  is a
   new configuration of request timeout (and maybe change the
 configuration
   name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
  
  There are 3 possible compatibility issues I see here:
  
  * I assumed this meant the constants also change, so timeout.ms
 becomes
  
  replication.timeout.ms. This breaks config files that worked on the
  previous version and the only warning would be in release notes. We do
  warn
  about unused configs so they might notice the problem.
  
  * Binary and source compatibility if someone configures their client in
  code and uses the TIMEOUT_CONFIG variable. Renaming it will cause
 existing
  jars to break if you try to run against an updated client (which seems
 not
  very significant since I doubt people upgrade these without recompiling
  but
  maybe I'm wrong about that). And it breaks builds without have
 deprecated
  that field first, which again, is probably not the biggest issue but is
  annoying for users and when we accidentally changed the API we
 received a
  complaint about breaking builds.
  
  * Behavior compatibility as Jay mentioned on the call -- setting the
  config
  (even if the name changed) doesn't have the same effect it used to.
  
  One solution, which admittedly is more painful to implement and
 maintain,
  would be to maintain the timeout.ms config, have it override the
 others
  if
  it is specified (including an infinite request timeout I guess?), and
 if
  it
  isn't specified, we can just use the new config variables. Given a 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Jiangjie Qin
Hey Jay,

I think that is a very reasonable concern.
So the current behavior for those users are:
1. Send() will go through as long as metadata is available.
2. Send() will throw exception if the metadata of a partition is lost
after pre-initialization.
3. The messages in accumulator will not be failed but wait until the
partition metadata is available again.

If we reuse metadata timeout, (1) and (2) are kept the same. Only (3) is
changed as those messages will be failed immediately when the batch is
ready.
It is probably not an issue though, because user will get exception from
send() call anyway in this case. It is probably better to fail the
messages in accumulator than keeping them in that case because I really
cannot think of any case where metadata of a partition can disappear and
come up again shortly.

So I guess the metadata timeout does exactly what it means - how long you
are willing to wait for metadata. It is not designed to provide a blocking
boundary for send() - we have blocking on buffer full as well. It is just
one of the dependencies in send() so send() could be blocked for metadata
timeout.

I totally agree we should explain all the timeouts clearly. I think we are
fine as long as we make sure the configuration is used for what it sounds
to be used and articulate the impacts of those configurations. I’ll check
what would happen if a broker is down when we try to connect to it as well.

Thanks.

Jiangjie (Becket) Qin


On 5/19/15, 11:38 AM, Jay Kreps jay.kr...@gmail.com wrote:

Here is the concern I had with reusing the metadata.fetch.timeout.ms:

Previously people were using this as a bound on the time send() would
block. It isn't a bound on the time we will wait on a metadata request,
just the time the send call will block if metadata is missing for the
topic. We told people who wanted a guarantee of no blocking to basically
preinitialize metadata and set this timeout to 0. However I think now this
will have a slightly different side effect which is to kill any request
immediately for a leaderless partition even though that request is safely
buffered in the record accumulator and no blocking will occur. People
using
the setting in the original way would now get a bit of a surprise.

This may actually be okay and there is always a tradeoff between
simplicity
and control.

-Jay

On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote:

 I think this looks good. What I think is missing is an overview of the
 timeouts from the user's perspective.

 My worry is that it is quite complicated to reason about the current set
 of timeouts. Currently we have
timeout.ms
metadata.fetch.timeout.ms

 The proposed settings I think are:
   batch.expiration.ms
 request.timeout.ms
 replication.timeout.ms

 I think maybe we can skip the batch.expiration.ms. Instead maybe we can
 somehow combine these into a single request timeout so that we subtract
the
 time you spent waiting from the request timeout and/or replication
timeout
 somehow? I don't have an explicit proposal but my suspicion is that from
 the user's point of view there is just one timeout related to the
request
 after which they don't care, and we can split that up between the batch
 time and the request time. Thoughts?

 How are we handling connection timeouts? If a machine hard fails in the
 middle of connection establishment there will be no outstanding
requests. I
 think this may be okay because connections are established when we want
to
 send a request and presumably we will begin the timer then?

 To that end I suggest we do two things:
 1. Include KAKFA-1788. I know that technically these two things are
 different but from the user's point of view they aren't.
 2. Include in the KIP the explanation to the user of the full set of
 timeouts, what they mean, how we will default them, and when to override
 which.

 I know this is a hassle but I think the end experience will be a lot
 better if we go through this thought process.

 -Jay

 On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin
j...@linkedin.com.invalid
 wrote:

 I modified the WIKI page to incorporate the feedbacks from mailing list
 and KIP hangout.

 - Added the deprecation plan for TIMEOUT_CONFIG
 - Added the actions to take after request timeout

 I finally chose to create a new connection if requests timeout. The
reason
 is:
 1. In most cases, if a broker is just slow, as long as we set request
 timeout to be a reasonable value, we should not see many new
connections
 get created.
 2. If a broker is down, hopefully metadata refresh will find the new
 broker and we will not try to reconnect to the broker anymore.

 Comments are welcome!

 Thanks.

 Jiangjie (Becket) Qin

 On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com
 wrote:

 +1 Becket. That would give enough time for clients to move. We should
 make
 this change very clear.
 
 Thanks,
 
 Mayuresh
 
 On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin
j...@linkedin.com.invalid
 
 wrote:
 
  

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-19 Thread Mayuresh Gharat
Hi Jiangjie,

So when you say :
It is probably better to fail the
messages in accumulator than keeping them in that case because I really
cannot think of any case where metadata of a partition can disappear and
come up again shortly,
 This is true
* unless there is a metadata refresh that occurs during that interval
right?*
Thanks,

Mayuresh

On Tue, May 19, 2015 at 12:26 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Jay,

 I think that is a very reasonable concern.
 So the current behavior for those users are:
 1. Send() will go through as long as metadata is available.
 2. Send() will throw exception if the metadata of a partition is lost
 after pre-initialization.
 3. The messages in accumulator will not be failed but wait until the
 partition metadata is available again.

 If we reuse metadata timeout, (1) and (2) are kept the same. Only (3) is
 changed as those messages will be failed immediately when the batch is
 ready.
 It is probably not an issue though, because user will get exception from
 send() call anyway in this case. It is probably better to fail the
 messages in accumulator than keeping them in that case because I really
 cannot think of any case where metadata of a partition can disappear and
 come up again shortly.

 So I guess the metadata timeout does exactly what it means - how long you
 are willing to wait for metadata. It is not designed to provide a blocking
 boundary for send() - we have blocking on buffer full as well. It is just
 one of the dependencies in send() so send() could be blocked for metadata
 timeout.

 I totally agree we should explain all the timeouts clearly. I think we are
 fine as long as we make sure the configuration is used for what it sounds
 to be used and articulate the impacts of those configurations. I’ll check
 what would happen if a broker is down when we try to connect to it as well.

 Thanks.

 Jiangjie (Becket) Qin


 On 5/19/15, 11:38 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Here is the concern I had with reusing the metadata.fetch.timeout.ms:
 
 Previously people were using this as a bound on the time send() would
 block. It isn't a bound on the time we will wait on a metadata request,
 just the time the send call will block if metadata is missing for the
 topic. We told people who wanted a guarantee of no blocking to basically
 preinitialize metadata and set this timeout to 0. However I think now this
 will have a slightly different side effect which is to kill any request
 immediately for a leaderless partition even though that request is safely
 buffered in the record accumulator and no blocking will occur. People
 using
 the setting in the original way would now get a bit of a surprise.
 
 This may actually be okay and there is always a tradeoff between
 simplicity
 and control.
 
 -Jay
 
 On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
  I think this looks good. What I think is missing is an overview of the
  timeouts from the user's perspective.
 
  My worry is that it is quite complicated to reason about the current set
  of timeouts. Currently we have
 timeout.ms
 metadata.fetch.timeout.ms
 
  The proposed settings I think are:
batch.expiration.ms
  request.timeout.ms
  replication.timeout.ms
 
  I think maybe we can skip the batch.expiration.ms. Instead maybe we can
  somehow combine these into a single request timeout so that we subtract
 the
  time you spent waiting from the request timeout and/or replication
 timeout
  somehow? I don't have an explicit proposal but my suspicion is that from
  the user's point of view there is just one timeout related to the
 request
  after which they don't care, and we can split that up between the batch
  time and the request time. Thoughts?
 
  How are we handling connection timeouts? If a machine hard fails in the
  middle of connection establishment there will be no outstanding
 requests. I
  think this may be okay because connections are established when we want
 to
  send a request and presumably we will begin the timer then?
 
  To that end I suggest we do two things:
  1. Include KAKFA-1788. I know that technically these two things are
  different but from the user's point of view they aren't.
  2. Include in the KIP the explanation to the user of the full set of
  timeouts, what they mean, how we will default them, and when to override
  which.
 
  I know this is a hassle but I think the end experience will be a lot
  better if we go through this thought process.
 
  -Jay
 
  On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  wrote:
 
  I modified the WIKI page to incorporate the feedbacks from mailing list
  and KIP hangout.
 
  - Added the deprecation plan for TIMEOUT_CONFIG
  - Added the actions to take after request timeout
 
  I finally chose to create a new connection if requests timeout. The
 reason
  is:
  1. In most cases, if a broker is just slow, as long as we set request
  timeout to be a reasonable value, we 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-15 Thread Mayuresh Gharat
+1 on creating a new connection.
So from what I understand the request timeout should be greater than
the replication timeout in any case.

If the broker is slow or not responding and the request times out we will
treat it as we treat disconnections and update metadata try sending it to
new leader or the same broker on a new connection.

Thanks,

Mayuresh

On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I modified the WIKI page to incorporate the feedbacks from mailing list
 and KIP hangout.

 - Added the deprecation plan for TIMEOUT_CONFIG
 - Added the actions to take after request timeout

 I finally chose to create a new connection if requests timeout. The reason
 is:
 1. In most cases, if a broker is just slow, as long as we set request
 timeout to be a reasonable value, we should not see many new connections
 get created.
 2. If a broker is down, hopefully metadata refresh will find the new
 broker and we will not try to reconnect to the broker anymore.

 Comments are welcome!

 Thanks.

 Jiangjie (Becket) Qin

 On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote:

 +1 Becket. That would give enough time for clients to move. We should make
 this change very clear.
 
 Thanks,
 
 Mayuresh
 
 On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
  Hey Ewen,
 
  Very good summary about the compatibility. What you proposed makes
 sense.
  So basically we can do the following:
 
  In next release, i.e. 0.8.3:
  1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”)
  2. Mark TIMEOUT_CONFIG as deprecated
  3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is
  defined and give a warning about deprecation.
  In the release after 0.8.3, we remove TIMEOUT_CONFIG.
 
  This should give enough buffer for this change.
 
  Request timeout is a complete new thing we add to fix a bug, I’m with
 you
  it does not make sense to have it maintain the old buggy behavior. So we
  can set it to a reasonable value instead of infinite.
 
  Jiangjie (Becket) Qin
 
  On 5/12/15, 12:03 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
 
  I think my confusion is coming from this:
  
   So in this KIP, we only address (3). The only public interface change
  is a
   new configuration of request timeout (and maybe change the
 configuration
   name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
  
  There are 3 possible compatibility issues I see here:
  
  * I assumed this meant the constants also change, so timeout.ms
 becomes
  
  replication.timeout.ms. This breaks config files that worked on the
  previous version and the only warning would be in release notes. We do
  warn
  about unused configs so they might notice the problem.
  
  * Binary and source compatibility if someone configures their client in
  code and uses the TIMEOUT_CONFIG variable. Renaming it will cause
 existing
  jars to break if you try to run against an updated client (which seems
 not
  very significant since I doubt people upgrade these without recompiling
  but
  maybe I'm wrong about that). And it breaks builds without have
 deprecated
  that field first, which again, is probably not the biggest issue but is
  annoying for users and when we accidentally changed the API we
 received a
  complaint about breaking builds.
  
  * Behavior compatibility as Jay mentioned on the call -- setting the
  config
  (even if the name changed) doesn't have the same effect it used to.
  
  One solution, which admittedly is more painful to implement and
 maintain,
  would be to maintain the timeout.ms config, have it override the
 others
  if
  it is specified (including an infinite request timeout I guess?), and
 if
  it
  isn't specified, we can just use the new config variables. Given a real
  deprecation schedule, users would have better warning of changes and a
  window to make the changes.
  
  I actually think it might not be necessary to maintain the old behavior
  precisely, although maybe for some code it is an issue if they start
  seeing
  timeout exceptions that they wouldn't have seen before?
  
  -Ewen
  
  On Wed, May 6, 2015 at 6:06 PM, Jun Rao j...@confluent.io wrote:
  
   Jiangjie,
  
   Yes, I think using metadata timeout to expire batches in the record
   accumulator makes sense.
  
   Thanks,
  
   Jun
  
   On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin
  j...@linkedin.com.invalid
   wrote:
  
I incorporated Ewen and Guozhang’s comments in the KIP page. Want
 to
   speed
up on this KIP because currently we experience mirror-maker hung
 very
likely when a broker is down.
   
I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used
 metadata
timeout to expire the batches which are sitting in accumulator
 without
leader info. I did that because the situation there is essentially
   missing
metadata.
   
As a summary of what I am thinking about the timeout in new
 Producer:
   
1. Metadata timeout:
  - 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-15 Thread Jiangjie Qin
I modified the WIKI page to incorporate the feedbacks from mailing list
and KIP hangout.

- Added the deprecation plan for TIMEOUT_CONFIG
- Added the actions to take after request timeout

I finally chose to create a new connection if requests timeout. The reason
is:
1. In most cases, if a broker is just slow, as long as we set request
timeout to be a reasonable value, we should not see many new connections
get created. 
2. If a broker is down, hopefully metadata refresh will find the new
broker and we will not try to reconnect to the broker anymore.

Comments are welcome!

Thanks.

Jiangjie (Becket) Qin

On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote:

+1 Becket. That would give enough time for clients to move. We should make
this change very clear.

Thanks,

Mayuresh

On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Ewen,

 Very good summary about the compatibility. What you proposed makes
sense.
 So basically we can do the following:

 In next release, i.e. 0.8.3:
 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”)
 2. Mark TIMEOUT_CONFIG as deprecated
 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is
 defined and give a warning about deprecation.
 In the release after 0.8.3, we remove TIMEOUT_CONFIG.

 This should give enough buffer for this change.

 Request timeout is a complete new thing we add to fix a bug, I’m with
you
 it does not make sense to have it maintain the old buggy behavior. So we
 can set it to a reasonable value instead of infinite.

 Jiangjie (Becket) Qin

 On 5/12/15, 12:03 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 I think my confusion is coming from this:
 
  So in this KIP, we only address (3). The only public interface change
 is a
  new configuration of request timeout (and maybe change the
configuration
  name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
 
 There are 3 possible compatibility issues I see here:
 
 * I assumed this meant the constants also change, so timeout.ms
becomes
 
 replication.timeout.ms. This breaks config files that worked on the
 previous version and the only warning would be in release notes. We do
 warn
 about unused configs so they might notice the problem.
 
 * Binary and source compatibility if someone configures their client in
 code and uses the TIMEOUT_CONFIG variable. Renaming it will cause
existing
 jars to break if you try to run against an updated client (which seems
not
 very significant since I doubt people upgrade these without recompiling
 but
 maybe I'm wrong about that). And it breaks builds without have
deprecated
 that field first, which again, is probably not the biggest issue but is
 annoying for users and when we accidentally changed the API we
received a
 complaint about breaking builds.
 
 * Behavior compatibility as Jay mentioned on the call -- setting the
 config
 (even if the name changed) doesn't have the same effect it used to.
 
 One solution, which admittedly is more painful to implement and
maintain,
 would be to maintain the timeout.ms config, have it override the others
 if
 it is specified (including an infinite request timeout I guess?), and
if
 it
 isn't specified, we can just use the new config variables. Given a real
 deprecation schedule, users would have better warning of changes and a
 window to make the changes.
 
 I actually think it might not be necessary to maintain the old behavior
 precisely, although maybe for some code it is an issue if they start
 seeing
 timeout exceptions that they wouldn't have seen before?
 
 -Ewen
 
 On Wed, May 6, 2015 at 6:06 PM, Jun Rao j...@confluent.io wrote:
 
  Jiangjie,
 
  Yes, I think using metadata timeout to expire batches in the record
  accumulator makes sense.
 
  Thanks,
 
  Jun
 
  On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin
 j...@linkedin.com.invalid
  wrote:
 
   I incorporated Ewen and Guozhang’s comments in the KIP page. Want
to
  speed
   up on this KIP because currently we experience mirror-maker hung
very
   likely when a broker is down.
  
   I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used
metadata
   timeout to expire the batches which are sitting in accumulator
without
   leader info. I did that because the situation there is essentially
  missing
   metadata.
  
   As a summary of what I am thinking about the timeout in new
Producer:
  
   1. Metadata timeout:
 - used in send(), blocking
 - used in accumulator to expire batches with timeout exception.
   2. Linger.ms
 - Used in accumulator to ready the batch for drain
   3. Request timeout
 - Used in NetworkClient to expire a batch and retry if no
response
 is
   received for a request before timeout.
  
   So in this KIP, we only address (3). The only public interface
change
 is
  a
   new configuration of request timeout (and maybe change the
 configuration
   name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
  
   Would like to see what people think of above 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-12 Thread Ewen Cheslack-Postava
I think my confusion is coming from this:

 So in this KIP, we only address (3). The only public interface change is a
 new configuration of request timeout (and maybe change the configuration
 name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).

There are 3 possible compatibility issues I see here:

* I assumed this meant the constants also change, so timeout.ms becomes 
replication.timeout.ms. This breaks config files that worked on the
previous version and the only warning would be in release notes. We do warn
about unused configs so they might notice the problem.

* Binary and source compatibility if someone configures their client in
code and uses the TIMEOUT_CONFIG variable. Renaming it will cause existing
jars to break if you try to run against an updated client (which seems not
very significant since I doubt people upgrade these without recompiling but
maybe I'm wrong about that). And it breaks builds without have deprecated
that field first, which again, is probably not the biggest issue but is
annoying for users and when we accidentally changed the API we received a
complaint about breaking builds.

* Behavior compatibility as Jay mentioned on the call -- setting the config
(even if the name changed) doesn't have the same effect it used to.

One solution, which admittedly is more painful to implement and maintain,
would be to maintain the timeout.ms config, have it override the others if
it is specified (including an infinite request timeout I guess?), and if it
isn't specified, we can just use the new config variables. Given a real
deprecation schedule, users would have better warning of changes and a
window to make the changes.

I actually think it might not be necessary to maintain the old behavior
precisely, although maybe for some code it is an issue if they start seeing
timeout exceptions that they wouldn't have seen before?

-Ewen

On Wed, May 6, 2015 at 6:06 PM, Jun Rao j...@confluent.io wrote:

 Jiangjie,

 Yes, I think using metadata timeout to expire batches in the record
 accumulator makes sense.

 Thanks,

 Jun

 On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  I incorporated Ewen and Guozhang’s comments in the KIP page. Want to
 speed
  up on this KIP because currently we experience mirror-maker hung very
  likely when a broker is down.
 
  I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata
  timeout to expire the batches which are sitting in accumulator without
  leader info. I did that because the situation there is essentially
 missing
  metadata.
 
  As a summary of what I am thinking about the timeout in new Producer:
 
  1. Metadata timeout:
- used in send(), blocking
- used in accumulator to expire batches with timeout exception.
  2. Linger.ms
- Used in accumulator to ready the batch for drain
  3. Request timeout
- Used in NetworkClient to expire a batch and retry if no response is
  received for a request before timeout.
 
  So in this KIP, we only address (3). The only public interface change is
 a
  new configuration of request timeout (and maybe change the configuration
  name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
 
  Would like to see what people think of above approach?
 
  Jiangjie (Becket) Qin
 
  On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote:
 
  Jun,
  
  I thought a little bit differently on this.
  Intuitively, I am thinking that if a partition is offline, the metadata
  for that partition should be considered not ready because we don’t know
  which broker we should send the message to. So those sends need to be
  blocked on metadata timeout.
  Another thing I’m wondering is in which scenario an offline partition
 will
  become online again in a short period of time and how likely it will
  occur. My understanding is that the batch timeout for batches sitting in
  accumulator should be larger than linger.ms but should not be too long
  (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer
  with batches to be aborted.
  
  That said, I do agree it is reasonable to buffer the message for some
 time
  so messages to other partitions can still get sent. But adding another
  expiration in addition to linger.ms - which is essentially a timeout -
  sounds a little bit confusing. Maybe we can do this, let the batch sit
 in
  accumulator up to linger.ms, then fail it if necessary.
  
  What do you think?
  
  Thanks,
  
  Jiangjie (Becket) Qin
  
  On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote:
  
  Jiangjie,
  
  Allowing messages to be accumulated in an offline partition could be
  useful
  since the partition may become available before the request timeout or
  linger time is reached. Now that we are planning to add a new timeout,
 it
  would be useful to think through whether/how that applies to messages
 in
  the accumulator too.
  
  Thanks,
  
  Jun
  
  
  On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
  wrote:
  

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-12 Thread Mayuresh Gharat
+1 Becket. That would give enough time for clients to move. We should make
this change very clear.

Thanks,

Mayuresh

On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hey Ewen,

 Very good summary about the compatibility. What you proposed makes sense.
 So basically we can do the following:

 In next release, i.e. 0.8.3:
 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”)
 2. Mark TIMEOUT_CONFIG as deprecated
 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is
 defined and give a warning about deprecation.
 In the release after 0.8.3, we remove TIMEOUT_CONFIG.

 This should give enough buffer for this change.

 Request timeout is a complete new thing we add to fix a bug, I’m with you
 it does not make sense to have it maintain the old buggy behavior. So we
 can set it to a reasonable value instead of infinite.

 Jiangjie (Becket) Qin

 On 5/12/15, 12:03 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 I think my confusion is coming from this:
 
  So in this KIP, we only address (3). The only public interface change
 is a
  new configuration of request timeout (and maybe change the configuration
  name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
 
 There are 3 possible compatibility issues I see here:
 
 * I assumed this meant the constants also change, so timeout.ms becomes
 
 replication.timeout.ms. This breaks config files that worked on the
 previous version and the only warning would be in release notes. We do
 warn
 about unused configs so they might notice the problem.
 
 * Binary and source compatibility if someone configures their client in
 code and uses the TIMEOUT_CONFIG variable. Renaming it will cause existing
 jars to break if you try to run against an updated client (which seems not
 very significant since I doubt people upgrade these without recompiling
 but
 maybe I'm wrong about that). And it breaks builds without have deprecated
 that field first, which again, is probably not the biggest issue but is
 annoying for users and when we accidentally changed the API we received a
 complaint about breaking builds.
 
 * Behavior compatibility as Jay mentioned on the call -- setting the
 config
 (even if the name changed) doesn't have the same effect it used to.
 
 One solution, which admittedly is more painful to implement and maintain,
 would be to maintain the timeout.ms config, have it override the others
 if
 it is specified (including an infinite request timeout I guess?), and if
 it
 isn't specified, we can just use the new config variables. Given a real
 deprecation schedule, users would have better warning of changes and a
 window to make the changes.
 
 I actually think it might not be necessary to maintain the old behavior
 precisely, although maybe for some code it is an issue if they start
 seeing
 timeout exceptions that they wouldn't have seen before?
 
 -Ewen
 
 On Wed, May 6, 2015 at 6:06 PM, Jun Rao j...@confluent.io wrote:
 
  Jiangjie,
 
  Yes, I think using metadata timeout to expire batches in the record
  accumulator makes sense.
 
  Thanks,
 
  Jun
 
  On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin
 j...@linkedin.com.invalid
  wrote:
 
   I incorporated Ewen and Guozhang’s comments in the KIP page. Want to
  speed
   up on this KIP because currently we experience mirror-maker hung very
   likely when a broker is down.
  
   I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata
   timeout to expire the batches which are sitting in accumulator without
   leader info. I did that because the situation there is essentially
  missing
   metadata.
  
   As a summary of what I am thinking about the timeout in new Producer:
  
   1. Metadata timeout:
 - used in send(), blocking
 - used in accumulator to expire batches with timeout exception.
   2. Linger.ms
 - Used in accumulator to ready the batch for drain
   3. Request timeout
 - Used in NetworkClient to expire a batch and retry if no response
 is
   received for a request before timeout.
  
   So in this KIP, we only address (3). The only public interface change
 is
  a
   new configuration of request timeout (and maybe change the
 configuration
   name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
  
   Would like to see what people think of above approach?
  
   Jiangjie (Becket) Qin
  
   On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote:
  
   Jun,
   
   I thought a little bit differently on this.
   Intuitively, I am thinking that if a partition is offline, the
 metadata
   for that partition should be considered not ready because we don’t
 know
   which broker we should send the message to. So those sends need to be
   blocked on metadata timeout.
   Another thing I’m wondering is in which scenario an offline partition
  will
   become online again in a short period of time and how likely it will
   occur. My understanding is that the batch timeout for batches
 sitting in
   accumulator should be larger than 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-06 Thread Jiangjie Qin
Yes, that is the plan.

On 5/5/15, 8:23 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote:

Just a quick question, can we handle REQUEST TIMEOUT as disconnections and
do a fresh MetaDataRequest and retry instead of failing the request?


Thanks,

Mayuresh


On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I incorporated Ewen and Guozhang’s comments in the KIP page. Want to
speed
 up on this KIP because currently we experience mirror-maker hung very
 likely when a broker is down.

 I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata
 timeout to expire the batches which are sitting in accumulator without
 leader info. I did that because the situation there is essentially
missing
 metadata.

 As a summary of what I am thinking about the timeout in new Producer:

 1. Metadata timeout:
   - used in send(), blocking
   - used in accumulator to expire batches with timeout exception.
 2. Linger.ms
   - Used in accumulator to ready the batch for drain
 3. Request timeout
   - Used in NetworkClient to expire a batch and retry if no response is
 received for a request before timeout.

 So in this KIP, we only address (3). The only public interface change
is a
 new configuration of request timeout (and maybe change the configuration
 name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).

 Would like to see what people think of above approach?

 Jiangjie (Becket) Qin

 On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote:

 Jun,
 
 I thought a little bit differently on this.
 Intuitively, I am thinking that if a partition is offline, the metadata
 for that partition should be considered not ready because we don’t know
 which broker we should send the message to. So those sends need to be
 blocked on metadata timeout.
 Another thing I’m wondering is in which scenario an offline partition
will
 become online again in a short period of time and how likely it will
 occur. My understanding is that the batch timeout for batches sitting
in
 accumulator should be larger than linger.ms but should not be too long
 (e.g. less than 60 seconds). Otherwise it will exhaust the shared
buffer
 with batches to be aborted.
 
 That said, I do agree it is reasonable to buffer the message for some
time
 so messages to other partitions can still get sent. But adding another
 expiration in addition to linger.ms - which is essentially a timeout -
 sounds a little bit confusing. Maybe we can do this, let the batch sit
in
 accumulator up to linger.ms, then fail it if necessary.
 
 What do you think?
 
 Thanks,
 
 Jiangjie (Becket) Qin
 
 On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote:
 
 Jiangjie,
 
 Allowing messages to be accumulated in an offline partition could be
 useful
 since the partition may become available before the request timeout or
 linger time is reached. Now that we are planning to add a new
timeout, it
 would be useful to think through whether/how that applies to messages
in
 the accumulator too.
 
 Thanks,
 
 Jun
 
 
 On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin
j...@linkedin.com.invalid
 
 wrote:
 
  Hi Harsha,
 
  Took a quick look at the patch. I think it is still a little bit
  different. KAFKA-1788 only handles the case where a batch sitting in
  accumulator for too long. The KIP is trying to solve the issue
where a
  batch has already been drained from accumulator and sent to broker.
  We might be able to apply timeout on batch level to merge those two
 cases
  as Ewen suggested. But I’m not sure if it is a good idea to allow
 messages
  whose target partition is offline to sit in accumulator in the first
 place.
 
  Jiangjie (Becket) Qin
 
  On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io
 wrote:
 
  Guozhang and Jiangjie,
   Isn’t this work being covered in
  https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please
the
  review the patch there.
  Thanks,
  Harsha
  
  
  On April 15, 2015 at 10:39:40 PM, Guozhang Wang
(wangg...@gmail.com)
  wrote:
  
  Thanks for the update Jiangjie,
  
  I think it is actually NOT expected that hardware disconnection
will
 be
  detected by the selector, but rather will only be revealed upon TCP
  timeout, which could be hours.
  
  A couple of comments on the wiki:
  
  1. For KafkaProducer.close() and KafkaProducer.flush() we need the
  request
  timeout as implict timeout. I am not very clear what does this
mean?
  
  2. Currently the producer already has a TIMEOUT_CONFIG which
should
  really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
  REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming:
 admittedly
  
  it will change the config names but will reduce confusions moving
  forward.
  
  
  Guozhang
  
  
  On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
  wrote:
  
   Checked the code again. It seems that the disconnected channel is
 not
   detected by selector as expected.
  
   Currently we are depending on the
   

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-06 Thread Jun Rao
Jiangjie,

Yes, I think using metadata timeout to expire batches in the record
accumulator makes sense.

Thanks,

Jun

On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed
 up on this KIP because currently we experience mirror-maker hung very
 likely when a broker is down.

 I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata
 timeout to expire the batches which are sitting in accumulator without
 leader info. I did that because the situation there is essentially missing
 metadata.

 As a summary of what I am thinking about the timeout in new Producer:

 1. Metadata timeout:
   - used in send(), blocking
   - used in accumulator to expire batches with timeout exception.
 2. Linger.ms
   - Used in accumulator to ready the batch for drain
 3. Request timeout
   - Used in NetworkClient to expire a batch and retry if no response is
 received for a request before timeout.

 So in this KIP, we only address (3). The only public interface change is a
 new configuration of request timeout (and maybe change the configuration
 name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).

 Would like to see what people think of above approach?

 Jiangjie (Becket) Qin

 On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote:

 Jun,
 
 I thought a little bit differently on this.
 Intuitively, I am thinking that if a partition is offline, the metadata
 for that partition should be considered not ready because we don’t know
 which broker we should send the message to. So those sends need to be
 blocked on metadata timeout.
 Another thing I’m wondering is in which scenario an offline partition will
 become online again in a short period of time and how likely it will
 occur. My understanding is that the batch timeout for batches sitting in
 accumulator should be larger than linger.ms but should not be too long
 (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer
 with batches to be aborted.
 
 That said, I do agree it is reasonable to buffer the message for some time
 so messages to other partitions can still get sent. But adding another
 expiration in addition to linger.ms - which is essentially a timeout -
 sounds a little bit confusing. Maybe we can do this, let the batch sit in
 accumulator up to linger.ms, then fail it if necessary.
 
 What do you think?
 
 Thanks,
 
 Jiangjie (Becket) Qin
 
 On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote:
 
 Jiangjie,
 
 Allowing messages to be accumulated in an offline partition could be
 useful
 since the partition may become available before the request timeout or
 linger time is reached. Now that we are planning to add a new timeout, it
 would be useful to think through whether/how that applies to messages in
 the accumulator too.
 
 Thanks,
 
 Jun
 
 
 On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid
 
 wrote:
 
  Hi Harsha,
 
  Took a quick look at the patch. I think it is still a little bit
  different. KAFKA-1788 only handles the case where a batch sitting in
  accumulator for too long. The KIP is trying to solve the issue where a
  batch has already been drained from accumulator and sent to broker.
  We might be able to apply timeout on batch level to merge those two
 cases
  as Ewen suggested. But I’m not sure if it is a good idea to allow
 messages
  whose target partition is offline to sit in accumulator in the first
 place.
 
  Jiangjie (Becket) Qin
 
  On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io
 wrote:
 
  Guozhang and Jiangjie,
   Isn’t this work being covered in
  https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the
  review the patch there.
  Thanks,
  Harsha
  
  
  On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com)
  wrote:
  
  Thanks for the update Jiangjie,
  
  I think it is actually NOT expected that hardware disconnection will
 be
  detected by the selector, but rather will only be revealed upon TCP
  timeout, which could be hours.
  
  A couple of comments on the wiki:
  
  1. For KafkaProducer.close() and KafkaProducer.flush() we need the
  request
  timeout as implict timeout. I am not very clear what does this mean?
  
  2. Currently the producer already has a TIMEOUT_CONFIG which should
  really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
  REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming:
 admittedly
  
  it will change the config names but will reduce confusions moving
  forward.
  
  
  Guozhang
  
  
  On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
  wrote:
  
   Checked the code again. It seems that the disconnected channel is
 not
   detected by selector as expected.
  
   Currently we are depending on the
   o.a.k.common.network.Selector.disconnected set to see if we need to
 do
   something for a disconnected channel.
   However Selector.disconnected set is only 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-05 Thread Mayuresh Gharat
Just a quick question, can we handle REQUEST TIMEOUT as disconnections and
do a fresh MetaDataRequest and retry instead of failing the request?


Thanks,

Mayuresh


On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed
 up on this KIP because currently we experience mirror-maker hung very
 likely when a broker is down.

 I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata
 timeout to expire the batches which are sitting in accumulator without
 leader info. I did that because the situation there is essentially missing
 metadata.

 As a summary of what I am thinking about the timeout in new Producer:

 1. Metadata timeout:
   - used in send(), blocking
   - used in accumulator to expire batches with timeout exception.
 2. Linger.ms
   - Used in accumulator to ready the batch for drain
 3. Request timeout
   - Used in NetworkClient to expire a batch and retry if no response is
 received for a request before timeout.

 So in this KIP, we only address (3). The only public interface change is a
 new configuration of request timeout (and maybe change the configuration
 name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).

 Would like to see what people think of above approach?

 Jiangjie (Becket) Qin

 On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote:

 Jun,
 
 I thought a little bit differently on this.
 Intuitively, I am thinking that if a partition is offline, the metadata
 for that partition should be considered not ready because we don’t know
 which broker we should send the message to. So those sends need to be
 blocked on metadata timeout.
 Another thing I’m wondering is in which scenario an offline partition will
 become online again in a short period of time and how likely it will
 occur. My understanding is that the batch timeout for batches sitting in
 accumulator should be larger than linger.ms but should not be too long
 (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer
 with batches to be aborted.
 
 That said, I do agree it is reasonable to buffer the message for some time
 so messages to other partitions can still get sent. But adding another
 expiration in addition to linger.ms - which is essentially a timeout -
 sounds a little bit confusing. Maybe we can do this, let the batch sit in
 accumulator up to linger.ms, then fail it if necessary.
 
 What do you think?
 
 Thanks,
 
 Jiangjie (Becket) Qin
 
 On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote:
 
 Jiangjie,
 
 Allowing messages to be accumulated in an offline partition could be
 useful
 since the partition may become available before the request timeout or
 linger time is reached. Now that we are planning to add a new timeout, it
 would be useful to think through whether/how that applies to messages in
 the accumulator too.
 
 Thanks,
 
 Jun
 
 
 On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid
 
 wrote:
 
  Hi Harsha,
 
  Took a quick look at the patch. I think it is still a little bit
  different. KAFKA-1788 only handles the case where a batch sitting in
  accumulator for too long. The KIP is trying to solve the issue where a
  batch has already been drained from accumulator and sent to broker.
  We might be able to apply timeout on batch level to merge those two
 cases
  as Ewen suggested. But I’m not sure if it is a good idea to allow
 messages
  whose target partition is offline to sit in accumulator in the first
 place.
 
  Jiangjie (Becket) Qin
 
  On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io
 wrote:
 
  Guozhang and Jiangjie,
   Isn’t this work being covered in
  https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the
  review the patch there.
  Thanks,
  Harsha
  
  
  On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com)
  wrote:
  
  Thanks for the update Jiangjie,
  
  I think it is actually NOT expected that hardware disconnection will
 be
  detected by the selector, but rather will only be revealed upon TCP
  timeout, which could be hours.
  
  A couple of comments on the wiki:
  
  1. For KafkaProducer.close() and KafkaProducer.flush() we need the
  request
  timeout as implict timeout. I am not very clear what does this mean?
  
  2. Currently the producer already has a TIMEOUT_CONFIG which should
  really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
  REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming:
 admittedly
  
  it will change the config names but will reduce confusions moving
  forward.
  
  
  Guozhang
  
  
  On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
  wrote:
  
   Checked the code again. It seems that the disconnected channel is
 not
   detected by selector as expected.
  
   Currently we are depending on the
   o.a.k.common.network.Selector.disconnected set to see if we need to
 do
   something for a disconnected 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-04 Thread Jiangjie Qin
I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed
up on this KIP because currently we experience mirror-maker hung very
likely when a broker is down.

I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata
timeout to expire the batches which are sitting in accumulator without
leader info. I did that because the situation there is essentially missing
metadata.

As a summary of what I am thinking about the timeout in new Producer:

1. Metadata timeout:
  - used in send(), blocking
  - used in accumulator to expire batches with timeout exception.
2. Linger.ms
  - Used in accumulator to ready the batch for drain
3. Request timeout
  - Used in NetworkClient to expire a batch and retry if no response is
received for a request before timeout.

So in this KIP, we only address (3). The only public interface change is a
new configuration of request timeout (and maybe change the configuration
name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).

Would like to see what people think of above approach?

Jiangjie (Becket) Qin

On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote:

Jun,

I thought a little bit differently on this.
Intuitively, I am thinking that if a partition is offline, the metadata
for that partition should be considered not ready because we don’t know
which broker we should send the message to. So those sends need to be
blocked on metadata timeout.
Another thing I’m wondering is in which scenario an offline partition will
become online again in a short period of time and how likely it will
occur. My understanding is that the batch timeout for batches sitting in
accumulator should be larger than linger.ms but should not be too long
(e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer
with batches to be aborted.

That said, I do agree it is reasonable to buffer the message for some time
so messages to other partitions can still get sent. But adding another
expiration in addition to linger.ms - which is essentially a timeout -
sounds a little bit confusing. Maybe we can do this, let the batch sit in
accumulator up to linger.ms, then fail it if necessary.

What do you think?

Thanks,

Jiangjie (Becket) Qin

On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote:

Jiangjie,

Allowing messages to be accumulated in an offline partition could be
useful
since the partition may become available before the request timeout or
linger time is reached. Now that we are planning to add a new timeout, it
would be useful to think through whether/how that applies to messages in
the accumulator too.

Thanks,

Jun


On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Harsha,

 Took a quick look at the patch. I think it is still a little bit
 different. KAFKA-1788 only handles the case where a batch sitting in
 accumulator for too long. The KIP is trying to solve the issue where a
 batch has already been drained from accumulator and sent to broker.
 We might be able to apply timeout on batch level to merge those two
cases
 as Ewen suggested. But I’m not sure if it is a good idea to allow
messages
 whose target partition is offline to sit in accumulator in the first
place.

 Jiangjie (Becket) Qin

 On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote:

 Guozhang and Jiangjie,
  Isn’t this work being covered in
 https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the
 review the patch there.
 Thanks,
 Harsha
 
 
 On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com)
 wrote:
 
 Thanks for the update Jiangjie,
 
 I think it is actually NOT expected that hardware disconnection will
be
 detected by the selector, but rather will only be revealed upon TCP
 timeout, which could be hours.
 
 A couple of comments on the wiki:
 
 1. For KafkaProducer.close() and KafkaProducer.flush() we need the
 request
 timeout as implict timeout. I am not very clear what does this mean?
 
 2. Currently the producer already has a TIMEOUT_CONFIG which should
 really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
 REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming:
admittedly
 
 it will change the config names but will reduce confusions moving
 forward.
 
 
 Guozhang
 
 
 On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
j...@linkedin.com.invalid
 
 wrote:
 
  Checked the code again. It seems that the disconnected channel is
not
  detected by selector as expected.
 
  Currently we are depending on the
  o.a.k.common.network.Selector.disconnected set to see if we need to
do
  something for a disconnected channel.
  However Selector.disconnected set is only updated when:
  1. A write/read/connect to channel failed.
  2. A Key is canceled
  However when a broker is down before it sends back the response, the
  client seems not be able to detect this failure.
 
  I did a simple test below:
  1. Run a selector on one machine and an echo server on another
machine.
 
  Connect a selector to 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-20 Thread Jiangjie Qin
Jun,

I thought a little bit differently on this.
Intuitively, I am thinking that if a partition is offline, the metadata
for that partition should be considered not ready because we don’t know
which broker we should send the message to. So those sends need to be
blocked on metadata timeout.
Another thing I’m wondering is in which scenario an offline partition will
become online again in a short period of time and how likely it will
occur. My understanding is that the batch timeout for batches sitting in
accumulator should be larger than linger.ms but should not be too long
(e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer
with batches to be aborted.

That said, I do agree it is reasonable to buffer the message for some time
so messages to other partitions can still get sent. But adding another
expiration in addition to linger.ms - which is essentially a timeout -
sounds a little bit confusing. Maybe we can do this, let the batch sit in
accumulator up to linger.ms, then fail it if necessary.

What do you think?

Thanks,

Jiangjie (Becket) Qin

On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote:

Jiangjie,

Allowing messages to be accumulated in an offline partition could be
useful
since the partition may become available before the request timeout or
linger time is reached. Now that we are planning to add a new timeout, it
would be useful to think through whether/how that applies to messages in
the accumulator too.

Thanks,

Jun


On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Harsha,

 Took a quick look at the patch. I think it is still a little bit
 different. KAFKA-1788 only handles the case where a batch sitting in
 accumulator for too long. The KIP is trying to solve the issue where a
 batch has already been drained from accumulator and sent to broker.
 We might be able to apply timeout on batch level to merge those two
cases
 as Ewen suggested. But I’m not sure if it is a good idea to allow
messages
 whose target partition is offline to sit in accumulator in the first
place.

 Jiangjie (Becket) Qin

 On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote:

 Guozhang and Jiangjie,
  Isn’t this work being covered in
 https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the
 review the patch there.
 Thanks,
 Harsha
 
 
 On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com)
 wrote:
 
 Thanks for the update Jiangjie,
 
 I think it is actually NOT expected that hardware disconnection will be
 detected by the selector, but rather will only be revealed upon TCP
 timeout, which could be hours.
 
 A couple of comments on the wiki:
 
 1. For KafkaProducer.close() and KafkaProducer.flush() we need the
 request
 timeout as implict timeout. I am not very clear what does this mean?
 
 2. Currently the producer already has a TIMEOUT_CONFIG which should
 really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
 REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming:
admittedly
 
 it will change the config names but will reduce confusions moving
 forward.
 
 
 Guozhang
 
 
 On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
j...@linkedin.com.invalid
 
 wrote:
 
  Checked the code again. It seems that the disconnected channel is not
  detected by selector as expected.
 
  Currently we are depending on the
  o.a.k.common.network.Selector.disconnected set to see if we need to
do
  something for a disconnected channel.
  However Selector.disconnected set is only updated when:
  1. A write/read/connect to channel failed.
  2. A Key is canceled
  However when a broker is down before it sends back the response, the
  client seems not be able to detect this failure.
 
  I did a simple test below:
  1. Run a selector on one machine and an echo server on another
machine.
 
  Connect a selector to an echo server
  2. Send a message to echo server using selector, then let the
selector
  poll() every 10 seconds.
  3. After the sever received the message, unplug cable on the echo
 server.
  4. After waiting for 45 min. The selector still did not detected the
  network failure.
  Lsof on selector machine shows that the TCP connection is still
 considered
  ESTABLISHED.
 
  I’m not sure in this case what should we expect from the
  java.nio.channels.Selector. According to the document, the selector
 does
  not verify the status of the associated channel. In my test case it
 looks
  even worse that OS did not think of the socket has been disconnected.
 
  Anyway. It seems adding the client side request timeout is necessary.
 I’ve
  updated the KIP page to clarify the problem we want to solve
according
 to
  Ewen’s comments.
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
  On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io
wrote:
 
 
  On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  wrote:
  
   Hi Ewen, thanks for the comments. Very good points! Please see
 replies
   inline.
  
  
 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-20 Thread Jun Rao
Jiangjie,

Allowing messages to be accumulated in an offline partition could be useful
since the partition may become available before the request timeout or
linger time is reached. Now that we are planning to add a new timeout, it
would be useful to think through whether/how that applies to messages in
the accumulator too.

Thanks,

Jun


On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Harsha,

 Took a quick look at the patch. I think it is still a little bit
 different. KAFKA-1788 only handles the case where a batch sitting in
 accumulator for too long. The KIP is trying to solve the issue where a
 batch has already been drained from accumulator and sent to broker.
 We might be able to apply timeout on batch level to merge those two cases
 as Ewen suggested. But I’m not sure if it is a good idea to allow messages
 whose target partition is offline to sit in accumulator in the first place.

 Jiangjie (Becket) Qin

 On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote:

 Guozhang and Jiangjie,
  Isn’t this work being covered in
 https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the
 review the patch there.
 Thanks,
 Harsha
 
 
 On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com)
 wrote:
 
 Thanks for the update Jiangjie,
 
 I think it is actually NOT expected that hardware disconnection will be
 detected by the selector, but rather will only be revealed upon TCP
 timeout, which could be hours.
 
 A couple of comments on the wiki:
 
 1. For KafkaProducer.close() and KafkaProducer.flush() we need the
 request
 timeout as implict timeout. I am not very clear what does this mean?
 
 2. Currently the producer already has a TIMEOUT_CONFIG which should
 really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
 REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly
 
 it will change the config names but will reduce confusions moving
 forward.
 
 
 Guozhang
 
 
 On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid
 
 wrote:
 
  Checked the code again. It seems that the disconnected channel is not
  detected by selector as expected.
 
  Currently we are depending on the
  o.a.k.common.network.Selector.disconnected set to see if we need to do
  something for a disconnected channel.
  However Selector.disconnected set is only updated when:
  1. A write/read/connect to channel failed.
  2. A Key is canceled
  However when a broker is down before it sends back the response, the
  client seems not be able to detect this failure.
 
  I did a simple test below:
  1. Run a selector on one machine and an echo server on another machine.
 
  Connect a selector to an echo server
  2. Send a message to echo server using selector, then let the selector
  poll() every 10 seconds.
  3. After the sever received the message, unplug cable on the echo
 server.
  4. After waiting for 45 min. The selector still did not detected the
  network failure.
  Lsof on selector machine shows that the TCP connection is still
 considered
  ESTABLISHED.
 
  I’m not sure in this case what should we expect from the
  java.nio.channels.Selector. According to the document, the selector
 does
  not verify the status of the associated channel. In my test case it
 looks
  even worse that OS did not think of the socket has been disconnected.
 
  Anyway. It seems adding the client side request timeout is necessary.
 I’ve
  updated the KIP page to clarify the problem we want to solve according
 to
  Ewen’s comments.
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
  On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote:
 
 
  On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  wrote:
  
   Hi Ewen, thanks for the comments. Very good points! Please see
 replies
   inline.
  
  
   On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io
  wrote:
  
   Jiangjie,
   
   Great start. I have a couple of comments.
   
   Under the motivation section, is it really true that the request
 will
   never
   be completed? Presumably if the broker goes down the connection
 will be
   severed, at worst by a TCP timeout, which should clean up the
  connection
   and any outstanding requests, right? I think the real reason we
 need a
   different timeout is that the default TCP timeouts are ridiculously
 
  long
   in
   this context.
   Yes, when broker is completely down the request should be cleared as
 you
   said. The case we encountered looks like the broker was just not
   responding but TCP connection was still alive though.
  
  
  Ok, that makes sense.
  
  
  
   
   My second question is about whether this is the right level to
 tackle
  the
   issue/what user-facing changes need to be made. A related problem
 came
  up
   in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
   records
   get stuck indefinitely because there's no client-side timeout. This
 KIP
   wouldn't fix 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-17 Thread Mayuresh Gharat
Agreed we also need to change in the code of Sender.java to indicate that
it resembles REPLICATION_TIMEOUT and not the request Timeout.

Thanks,

Mayuresh

On Thu, Apr 16, 2015 at 1:08 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Guozhang,

 By implicit timeout for close() and flush(), I meant that currently we
 don’t have a explicit timeout for close() or flush() when a broker is
 down, so they can take pretty long up to TCP timeout which is hours as you
 mentioned. With the client side request timeout, the waiting time would be
 sort of bounded by request timeout.

 And I agree we’d better change the TIMEOUT_CONFIG to
 REPLICATION_TIMEOUT_CONFIG to avoid confusion.

 Thanks.

 Jiangjie (Becket) Qin

 On 4/15/15, 10:38 PM, Guozhang Wang wangg...@gmail.com wrote:

 Thanks for the update Jiangjie,
 
 I think it is actually NOT expected that hardware disconnection will be
 detected by the selector, but rather will only be revealed upon TCP
 timeout, which could be hours.
 
 A couple of comments on the wiki:
 
 1. For KafkaProducer.close() and KafkaProducer.flush() we need the
 request
 timeout as implict timeout. I am not very clear what does this mean?
 
 2. Currently the producer already has a TIMEOUT_CONFIG which should
 really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
 REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly
 it will change the config names but will reduce confusions moving forward.
 
 
 Guozhang
 
 
 On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
  Checked the code again. It seems that the disconnected channel is not
  detected by selector as expected.
 
  Currently we are depending on the
  o.a.k.common.network.Selector.disconnected set to see if we need to do
  something for a disconnected channel.
  However Selector.disconnected set is only updated when:
  1. A write/read/connect to channel failed.
  2. A Key is canceled
  However when a broker is down before it sends back the response, the
  client seems not be able to detect this failure.
 
  I did a simple test below:
  1. Run a selector on one machine and an echo server on another machine.
  Connect a selector to an echo server
  2. Send a message to echo server using selector, then let the selector
  poll() every 10 seconds.
  3. After the sever received the message, unplug cable on the echo
 server.
  4. After waiting for 45 min. The selector still did not detected the
  network failure.
  Lsof on selector machine shows that the TCP connection is still
 considered
  ESTABLISHED.
 
  I’m not sure in this case what should we expect from the
  java.nio.channels.Selector. According to the document, the selector does
  not verify the status of the associated channel. In my test case it
 looks
  even worse that OS did not think of the socket has been disconnected.
 
  Anyway. It seems adding the client side request timeout is necessary.
 I’ve
  updated the KIP page to clarify the problem we want to solve according
 to
  Ewen’s comments.
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
  On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote:
 
  On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  wrote:
  
   Hi Ewen, thanks for the comments. Very good points! Please see
 replies
   inline.
  
  
   On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io
  wrote:
  
   Jiangjie,
   
   Great start. I have a couple of comments.
   
   Under the motivation section, is it really true that the request
 will
   never
   be completed? Presumably if the broker goes down the connection
 will be
   severed, at worst by a TCP timeout, which should clean up the
  connection
   and any outstanding requests, right? I think the real reason we
 need a
   different timeout is that the default TCP timeouts are ridiculously
  long
   in
   this context.
   Yes, when broker is completely down the request should be cleared as
 you
   said. The case we encountered looks like the broker was just not
   responding but TCP connection was still alive though.
  
  
  Ok, that makes sense.
  
  
  
   
   My second question is about whether this is the right level to
 tackle
  the
   issue/what user-facing changes need to be made. A related problem
 came
  up
   in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
   records
   get stuck indefinitely because there's no client-side timeout. This
 KIP
   wouldn't fix that problem or any problems caused by lack of
  connectivity
   since this would only apply to in flight requests, which by
 definition
   must
   have been sent on an active connection.
   
   I suspect both types of problems probably need to be addressed
  separately
   by introducing explicit timeouts. However, because the settings
  introduced
   here are very much about the internal implementations of the
 clients,
  I'm
   wondering if this even needs to be a user-facing setting, especially
  if we
   have to add other 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-16 Thread Jiangjie Qin
Hi Harsha,

Took a quick look at the patch. I think it is still a little bit
different. KAFKA-1788 only handles the case where a batch sitting in
accumulator for too long. The KIP is trying to solve the issue where a
batch has already been drained from accumulator and sent to broker.
We might be able to apply timeout on batch level to merge those two cases
as Ewen suggested. But I’m not sure if it is a good idea to allow messages
whose target partition is offline to sit in accumulator in the first place.

Jiangjie (Becket) Qin

On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote:

Guozhang and Jiangjie,
 Isn’t this work being covered in
https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the
review the patch there.
Thanks,
Harsha


On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com)
wrote:

Thanks for the update Jiangjie,

I think it is actually NOT expected that hardware disconnection will be
detected by the selector, but rather will only be revealed upon TCP
timeout, which could be hours.

A couple of comments on the wiki:

1. For KafkaProducer.close() and KafkaProducer.flush() we need the
request  
timeout as implict timeout. I am not very clear what does this mean?

2. Currently the producer already has a TIMEOUT_CONFIG which should
really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly
 
it will change the config names but will reduce confusions moving
forward.  


Guozhang  


On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid
 
wrote:  

 Checked the code again. It seems that the disconnected channel is not
 detected by selector as expected.
  
 Currently we are depending on the
 o.a.k.common.network.Selector.disconnected set to see if we need to do
 something for a disconnected channel.
 However Selector.disconnected set is only updated when:
 1. A write/read/connect to channel failed.
 2. A Key is canceled
 However when a broker is down before it sends back the response, the
 client seems not be able to detect this failure.
  
 I did a simple test below:
 1. Run a selector on one machine and an echo server on another machine.
 
 Connect a selector to an echo server
 2. Send a message to echo server using selector, then let the selector
 poll() every 10 seconds.
 3. After the sever received the message, unplug cable on the echo
server.  
 4. After waiting for 45 min. The selector still did not detected the
 network failure.
 Lsof on selector machine shows that the TCP connection is still
considered  
 ESTABLISHED.  
  
 I’m not sure in this case what should we expect from the
 java.nio.channels.Selector. According to the document, the selector
does  
 not verify the status of the associated channel. In my test case it
looks  
 even worse that OS did not think of the socket has been disconnected.
  
 Anyway. It seems adding the client side request timeout is necessary.
I’ve  
 updated the KIP page to clarify the problem we want to solve according
to  
 Ewen’s comments.
  
 Thanks.  
  
 Jiangjie (Becket) Qin
  
 On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote:
 
  
 On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
j...@linkedin.com.invalid
 wrote:  
   
  Hi Ewen, thanks for the comments. Very good points! Please see
replies  
  inline.  
   
   
  On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:  
   
  Jiangjie,  

  Great start. I have a couple of comments.

  Under the motivation section, is it really true that the request
will  
  never  
  be completed? Presumably if the broker goes down the connection
will be  
  severed, at worst by a TCP timeout, which should clean up the
 connection  
  and any outstanding requests, right? I think the real reason we
need a  
  different timeout is that the default TCP timeouts are ridiculously
 
 long  
  in  
  this context.
  Yes, when broker is completely down the request should be cleared as
you  
  said. The case we encountered looks like the broker was just not
  responding but TCP connection was still alive though.
   
   
 Ok, that makes sense.
   
   
   

  My second question is about whether this is the right level to
tackle  
 the  
  issue/what user-facing changes need to be made. A related problem
came  
 up  
  in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
  records  
  get stuck indefinitely because there's no client-side timeout. This
KIP  
  wouldn't fix that problem or any problems caused by lack of
 connectivity  
  since this would only apply to in flight requests, which by
definition  
  must  
  have been sent on an active connection.

  I suspect both types of problems probably need to be addressed
 separately  
  by introducing explicit timeouts. However, because the settings
 introduced  
  here are very much about the internal implementations of the
clients,  
 I'm  
  wondering if this even 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-16 Thread Jiangjie Qin
Hi Guozhang,

By implicit timeout for close() and flush(), I meant that currently we
don’t have a explicit timeout for close() or flush() when a broker is
down, so they can take pretty long up to TCP timeout which is hours as you
mentioned. With the client side request timeout, the waiting time would be
sort of bounded by request timeout.

And I agree we’d better change the TIMEOUT_CONFIG to
REPLICATION_TIMEOUT_CONFIG to avoid confusion.

Thanks.

Jiangjie (Becket) Qin

On 4/15/15, 10:38 PM, Guozhang Wang wangg...@gmail.com wrote:

Thanks for the update Jiangjie,

I think it is actually NOT expected that hardware disconnection will be
detected by the selector, but rather will only be revealed upon TCP
timeout, which could be hours.

A couple of comments on the wiki:

1. For KafkaProducer.close() and KafkaProducer.flush() we need the
request
timeout as implict timeout. I am not very clear what does this mean?

2. Currently the producer already has a TIMEOUT_CONFIG which should
really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly
it will change the config names but will reduce confusions moving forward.


Guozhang


On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Checked the code again. It seems that the disconnected channel is not
 detected by selector as expected.

 Currently we are depending on the
 o.a.k.common.network.Selector.disconnected set to see if we need to do
 something for a disconnected channel.
 However Selector.disconnected set is only updated when:
 1. A write/read/connect to channel failed.
 2. A Key is canceled
 However when a broker is down before it sends back the response, the
 client seems not be able to detect this failure.

 I did a simple test below:
 1. Run a selector on one machine and an echo server on another machine.
 Connect a selector to an echo server
 2. Send a message to echo server using selector, then let the selector
 poll() every 10 seconds.
 3. After the sever received the message, unplug cable on the echo
server.
 4. After waiting for 45 min. The selector still did not detected the
 network failure.
 Lsof on selector machine shows that the TCP connection is still
considered
 ESTABLISHED.

 I’m not sure in this case what should we expect from the
 java.nio.channels.Selector. According to the document, the selector does
 not verify the status of the associated channel. In my test case it
looks
 even worse that OS did not think of the socket has been disconnected.

 Anyway. It seems adding the client side request timeout is necessary.
I’ve
 updated the KIP page to clarify the problem we want to solve according
to
 Ewen’s comments.

 Thanks.

 Jiangjie (Becket) Qin

 On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
j...@linkedin.com.invalid
 wrote:
 
  Hi Ewen, thanks for the comments. Very good points! Please see
replies
  inline.
 
 
  On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
 
  Jiangjie,
  
  Great start. I have a couple of comments.
  
  Under the motivation section, is it really true that the request
will
  never
  be completed? Presumably if the broker goes down the connection
will be
  severed, at worst by a TCP timeout, which should clean up the
 connection
  and any outstanding requests, right? I think the real reason we
need a
  different timeout is that the default TCP timeouts are ridiculously
 long
  in
  this context.
  Yes, when broker is completely down the request should be cleared as
you
  said. The case we encountered looks like the broker was just not
  responding but TCP connection was still alive though.
 
 
 Ok, that makes sense.
 
 
 
  
  My second question is about whether this is the right level to
tackle
 the
  issue/what user-facing changes need to be made. A related problem
came
 up
  in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
  records
  get stuck indefinitely because there's no client-side timeout. This
KIP
  wouldn't fix that problem or any problems caused by lack of
 connectivity
  since this would only apply to in flight requests, which by
definition
  must
  have been sent on an active connection.
  
  I suspect both types of problems probably need to be addressed
 separately
  by introducing explicit timeouts. However, because the settings
 introduced
  here are very much about the internal implementations of the
clients,
 I'm
  wondering if this even needs to be a user-facing setting, especially
 if we
  have to add other timeouts anyway. For example, would a fixed,
generous
  value that's still much shorter than a TCP timeout, say 15s, be good
  enough? If other timeouts would allow, for example, the clients to
  properly
  exit even if requests have not hit their timeout, then what's the
 benefit
  of being able to configure the request-level timeout?
  That is a very good point. We have three 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-15 Thread Jiangjie Qin
Checked the code again. It seems that the disconnected channel is not
detected by selector as expected.

Currently we are depending on the
o.a.k.common.network.Selector.disconnected set to see if we need to do
something for a disconnected channel.
However Selector.disconnected set is only updated when:
1. A write/read/connect to channel failed.
2. A Key is canceled
However when a broker is down before it sends back the response, the
client seems not be able to detect this failure.

I did a simple test below:
1. Run a selector on one machine and an echo server on another machine.
Connect a selector to an echo server
2. Send a message to echo server using selector, then let the selector
poll() every 10 seconds.
3. After the sever received the message, unplug cable on the echo server.
4. After waiting for 45 min. The selector still did not detected the
network failure.
Lsof on selector machine shows that the TCP connection is still considered
ESTABLISHED.

I’m not sure in this case what should we expect from the
java.nio.channels.Selector. According to the document, the selector does
not verify the status of the associated channel. In my test case it looks
even worse that OS did not think of the socket has been disconnected.

Anyway. It seems adding the client side request timeout is necessary. I’ve
updated the KIP page to clarify the problem we want to solve according to
Ewen’s comments.

Thanks.

Jiangjie (Becket) Qin

On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Ewen, thanks for the comments. Very good points! Please see replies
 inline.


 On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 Jiangjie,
 
 Great start. I have a couple of comments.
 
 Under the motivation section, is it really true that the request will
 never
 be completed? Presumably if the broker goes down the connection will be
 severed, at worst by a TCP timeout, which should clean up the
connection
 and any outstanding requests, right? I think the real reason we need a
 different timeout is that the default TCP timeouts are ridiculously
long
 in
 this context.
 Yes, when broker is completely down the request should be cleared as you
 said. The case we encountered looks like the broker was just not
 responding but TCP connection was still alive though.


Ok, that makes sense.



 
 My second question is about whether this is the right level to tackle
the
 issue/what user-facing changes need to be made. A related problem came
up
 in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
 records
 get stuck indefinitely because there's no client-side timeout. This KIP
 wouldn't fix that problem or any problems caused by lack of
connectivity
 since this would only apply to in flight requests, which by definition
 must
 have been sent on an active connection.
 
 I suspect both types of problems probably need to be addressed
separately
 by introducing explicit timeouts. However, because the settings
introduced
 here are very much about the internal implementations of the clients,
I'm
 wondering if this even needs to be a user-facing setting, especially
if we
 have to add other timeouts anyway. For example, would a fixed, generous
 value that's still much shorter than a TCP timeout, say 15s, be good
 enough? If other timeouts would allow, for example, the clients to
 properly
 exit even if requests have not hit their timeout, then what's the
benefit
 of being able to configure the request-level timeout?
 That is a very good point. We have three places that we might be able to
 enforce timeout for a message send:
 1. Before append to accumulator - handled by metadata timeout on per
 message level.
 2. Batch of messages inside accumulator - no timeout mechanism now.
 3. Request of batches after messages leave the accumulator - we have a
 broker side timeout but no client side timeout for now.
 My current proposal only address (3) but not (2).
 Honestly I do not have a very clear idea about what should we do with
(2)
 right now. But I am with you that we should not expose too many
 configurations to users. What I am thinking now to handle (2) is when
user
 call send, if we know that a partition is offline, we should throw
 exception immediately instead of putting it into accumulator. This would
 protect further memory consumption. We might also want to fail all the
 batches in the dequeue once we found a partition is offline.  That
said, I
 feel timeout might not be quite applicable to (2).
 Do you have any suggestion on this?


Right, I didn't actually mean to solve 2 here, but was trying to figure
out
if a solution to 2 would reduce what we needed to do to address 3. (And
depending on how they are implemented, fixing 1 might also address 2). It
sounds like you hit hang that I wasn't really expecting. This probably
just
means the KIP motivation needs to be a bit clearer about what type of
situation this 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-15 Thread Guozhang Wang
Thanks for the update Jiangjie,

I think it is actually NOT expected that hardware disconnection will be
detected by the selector, but rather will only be revealed upon TCP
timeout, which could be hours.

A couple of comments on the wiki:

1. For KafkaProducer.close() and KafkaProducer.flush() we need the request
timeout as implict timeout. I am not very clear what does this mean?

2. Currently the producer already has a TIMEOUT_CONFIG which should
really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly
it will change the config names but will reduce confusions moving forward.


Guozhang


On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Checked the code again. It seems that the disconnected channel is not
 detected by selector as expected.

 Currently we are depending on the
 o.a.k.common.network.Selector.disconnected set to see if we need to do
 something for a disconnected channel.
 However Selector.disconnected set is only updated when:
 1. A write/read/connect to channel failed.
 2. A Key is canceled
 However when a broker is down before it sends back the response, the
 client seems not be able to detect this failure.

 I did a simple test below:
 1. Run a selector on one machine and an echo server on another machine.
 Connect a selector to an echo server
 2. Send a message to echo server using selector, then let the selector
 poll() every 10 seconds.
 3. After the sever received the message, unplug cable on the echo server.
 4. After waiting for 45 min. The selector still did not detected the
 network failure.
 Lsof on selector machine shows that the TCP connection is still considered
 ESTABLISHED.

 I’m not sure in this case what should we expect from the
 java.nio.channels.Selector. According to the document, the selector does
 not verify the status of the associated channel. In my test case it looks
 even worse that OS did not think of the socket has been disconnected.

 Anyway. It seems adding the client side request timeout is necessary. I’ve
 updated the KIP page to clarify the problem we want to solve according to
 Ewen’s comments.

 Thanks.

 Jiangjie (Becket) Qin

 On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:
 
  Hi Ewen, thanks for the comments. Very good points! Please see replies
  inline.
 
 
  On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
 
  Jiangjie,
  
  Great start. I have a couple of comments.
  
  Under the motivation section, is it really true that the request will
  never
  be completed? Presumably if the broker goes down the connection will be
  severed, at worst by a TCP timeout, which should clean up the
 connection
  and any outstanding requests, right? I think the real reason we need a
  different timeout is that the default TCP timeouts are ridiculously
 long
  in
  this context.
  Yes, when broker is completely down the request should be cleared as you
  said. The case we encountered looks like the broker was just not
  responding but TCP connection was still alive though.
 
 
 Ok, that makes sense.
 
 
 
  
  My second question is about whether this is the right level to tackle
 the
  issue/what user-facing changes need to be made. A related problem came
 up
  in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
  records
  get stuck indefinitely because there's no client-side timeout. This KIP
  wouldn't fix that problem or any problems caused by lack of
 connectivity
  since this would only apply to in flight requests, which by definition
  must
  have been sent on an active connection.
  
  I suspect both types of problems probably need to be addressed
 separately
  by introducing explicit timeouts. However, because the settings
 introduced
  here are very much about the internal implementations of the clients,
 I'm
  wondering if this even needs to be a user-facing setting, especially
 if we
  have to add other timeouts anyway. For example, would a fixed, generous
  value that's still much shorter than a TCP timeout, say 15s, be good
  enough? If other timeouts would allow, for example, the clients to
  properly
  exit even if requests have not hit their timeout, then what's the
 benefit
  of being able to configure the request-level timeout?
  That is a very good point. We have three places that we might be able to
  enforce timeout for a message send:
  1. Before append to accumulator - handled by metadata timeout on per
  message level.
  2. Batch of messages inside accumulator - no timeout mechanism now.
  3. Request of batches after messages leave the accumulator - we have a
  broker side timeout but no client side timeout for now.
  My current proposal only address (3) but not (2).
  Honestly I do not have a very clear idea about what should we do with
 (2)
  right now. But I am with you that we should not 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-14 Thread Ewen Cheslack-Postava
Jiangjie,

Great start. I have a couple of comments.

Under the motivation section, is it really true that the request will never
be completed? Presumably if the broker goes down the connection will be
severed, at worst by a TCP timeout, which should clean up the connection
and any outstanding requests, right? I think the real reason we need a
different timeout is that the default TCP timeouts are ridiculously long in
this context.

My second question is about whether this is the right level to tackle the
issue/what user-facing changes need to be made. A related problem came up
in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records
get stuck indefinitely because there's no client-side timeout. This KIP
wouldn't fix that problem or any problems caused by lack of connectivity
since this would only apply to in flight requests, which by definition must
have been sent on an active connection.

I suspect both types of problems probably need to be addressed separately
by introducing explicit timeouts. However, because the settings introduced
here are very much about the internal implementations of the clients, I'm
wondering if this even needs to be a user-facing setting, especially if we
have to add other timeouts anyway. For example, would a fixed, generous
value that's still much shorter than a TCP timeout, say 15s, be good
enough? If other timeouts would allow, for example, the clients to properly
exit even if requests have not hit their timeout, then what's the benefit
of being able to configure the request-level timeout?

I know we have a similar setting, max.in.flights.requests.per.connection,
exposed publicly (which I just discovered is missing from the new producer
configs documentation). But it looks like the new consumer is not exposing
that option, using a fixed value instead. I think we should default to
hiding these implementation values unless there's a strong case for a
scenario that requires customization.

In other words, since the only user-facing change was the addition of the
setting, I'm wondering if we can avoid the KIP altogether by just choosing
a good default value for the timeout.

-Ewen

On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi,

 I just created a KIP to add a request timeout to NetworkClient for new
 Kafka clients.

 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient

 Comments and suggestions are welcome!

 Thanks.

 Jiangjie (Becket) Qin




-- 
Thanks,
Ewen


Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-14 Thread Mayuresh Gharat
I suppose adding this timeout will help. In cases were the broker is not
completely down but stops responding to the produce request, tools like
Mirror Makers will hang since they are waiting for responses. Adding this
timeout enables it to fail the current request and retry with fresh
metadata.

Thanks,

Mayuresh

On Mon, Apr 13, 2015 at 11:19 PM, Ewen Cheslack-Postava e...@confluent.io
wrote:

 Jiangjie,

 Great start. I have a couple of comments.

 Under the motivation section, is it really true that the request will never
 be completed? Presumably if the broker goes down the connection will be
 severed, at worst by a TCP timeout, which should clean up the connection
 and any outstanding requests, right? I think the real reason we need a
 different timeout is that the default TCP timeouts are ridiculously long in
 this context.

 My second question is about whether this is the right level to tackle the
 issue/what user-facing changes need to be made. A related problem came up
 in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records
 get stuck indefinitely because there's no client-side timeout. This KIP
 wouldn't fix that problem or any problems caused by lack of connectivity
 since this would only apply to in flight requests, which by definition must
 have been sent on an active connection.

 I suspect both types of problems probably need to be addressed separately
 by introducing explicit timeouts. However, because the settings introduced
 here are very much about the internal implementations of the clients, I'm
 wondering if this even needs to be a user-facing setting, especially if we
 have to add other timeouts anyway. For example, would a fixed, generous
 value that's still much shorter than a TCP timeout, say 15s, be good
 enough? If other timeouts would allow, for example, the clients to properly
 exit even if requests have not hit their timeout, then what's the benefit
 of being able to configure the request-level timeout?

 I know we have a similar setting, max.in.flights.requests.per.connection,
 exposed publicly (which I just discovered is missing from the new producer
 configs documentation). But it looks like the new consumer is not exposing
 that option, using a fixed value instead. I think we should default to
 hiding these implementation values unless there's a strong case for a
 scenario that requires customization.

 In other words, since the only user-facing change was the addition of the
 setting, I'm wondering if we can avoid the KIP altogether by just choosing
 a good default value for the timeout.

 -Ewen

 On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  Hi,
 
  I just created a KIP to add a request timeout to NetworkClient for new
  Kafka clients.
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient
 
  Comments and suggestions are welcome!
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
 


 --
 Thanks,
 Ewen




-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-14 Thread Ewen Cheslack-Postava
On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Ewen, thanks for the comments. Very good points! Please see replies
 inline.


 On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 Jiangjie,
 
 Great start. I have a couple of comments.
 
 Under the motivation section, is it really true that the request will
 never
 be completed? Presumably if the broker goes down the connection will be
 severed, at worst by a TCP timeout, which should clean up the connection
 and any outstanding requests, right? I think the real reason we need a
 different timeout is that the default TCP timeouts are ridiculously long
 in
 this context.
 Yes, when broker is completely down the request should be cleared as you
 said. The case we encountered looks like the broker was just not
 responding but TCP connection was still alive though.


Ok, that makes sense.



 
 My second question is about whether this is the right level to tackle the
 issue/what user-facing changes need to be made. A related problem came up
 in https://issues.apache.org/jira/browse/KAFKA-1788 where producer
 records
 get stuck indefinitely because there's no client-side timeout. This KIP
 wouldn't fix that problem or any problems caused by lack of connectivity
 since this would only apply to in flight requests, which by definition
 must
 have been sent on an active connection.
 
 I suspect both types of problems probably need to be addressed separately
 by introducing explicit timeouts. However, because the settings introduced
 here are very much about the internal implementations of the clients, I'm
 wondering if this even needs to be a user-facing setting, especially if we
 have to add other timeouts anyway. For example, would a fixed, generous
 value that's still much shorter than a TCP timeout, say 15s, be good
 enough? If other timeouts would allow, for example, the clients to
 properly
 exit even if requests have not hit their timeout, then what's the benefit
 of being able to configure the request-level timeout?
 That is a very good point. We have three places that we might be able to
 enforce timeout for a message send:
 1. Before append to accumulator - handled by metadata timeout on per
 message level.
 2. Batch of messages inside accumulator - no timeout mechanism now.
 3. Request of batches after messages leave the accumulator - we have a
 broker side timeout but no client side timeout for now.
 My current proposal only address (3) but not (2).
 Honestly I do not have a very clear idea about what should we do with (2)
 right now. But I am with you that we should not expose too many
 configurations to users. What I am thinking now to handle (2) is when user
 call send, if we know that a partition is offline, we should throw
 exception immediately instead of putting it into accumulator. This would
 protect further memory consumption. We might also want to fail all the
 batches in the dequeue once we found a partition is offline.  That said, I
 feel timeout might not be quite applicable to (2).
 Do you have any suggestion on this?


Right, I didn't actually mean to solve 2 here, but was trying to figure out
if a solution to 2 would reduce what we needed to do to address 3. (And
depending on how they are implemented, fixing 1 might also address 2). It
sounds like you hit hang that I wasn't really expecting. This probably just
means the KIP motivation needs to be a bit clearer about what type of
situation this addresses. The cause of the hang may also be relevant -- if
it was something like a deadlock then that's something that should just be
fixed, but if it's something outside our control then a timeout makes a lot
more sense.


 
 I know we have a similar setting, max.in.flights.requests.per.connection,
 exposed publicly (which I just discovered is missing from the new producer
 configs documentation). But it looks like the new consumer is not exposing
 that option, using a fixed value instead. I think we should default to
 hiding these implementation values unless there's a strong case for a
 scenario that requires customization.
 For producer, max.in.flight.requests.per.connection really matters. If
 people do not want to have reorder of messages, they have to use
 max.in.flight.requests.per.connection=1. On the other hand, if throughput
 is more of a concern, it could be set to higher. For the new consumer, I
 checked the value and I am not sure if the hard coded
 max.in.flight.requests.per.connection=100 is the right value. Without the
 response to the previous request, what offsets should be put into the next
 fetch request? It seems to me the value will be one natively regardless of
 the setting unless we are sending fetch request to different partitions,
 which does not look like the case.
 Anyway, it looks to be a separate issue orthogonal to the request timeout.




 In other words, since the only user-facing change was the addition of the
 setting, I'm wondering if we can avoid the KIP 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-14 Thread Jiangjie Qin
Hi Ewen, thanks for the comments. Very good points! Please see replies
inline.


On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

Jiangjie,

Great start. I have a couple of comments.

Under the motivation section, is it really true that the request will
never
be completed? Presumably if the broker goes down the connection will be
severed, at worst by a TCP timeout, which should clean up the connection
and any outstanding requests, right? I think the real reason we need a
different timeout is that the default TCP timeouts are ridiculously long
in
this context.
Yes, when broker is completely down the request should be cleared as you
said. The case we encountered looks like the broker was just not
responding but TCP connection was still alive though.


My second question is about whether this is the right level to tackle the
issue/what user-facing changes need to be made. A related problem came up
in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records
get stuck indefinitely because there's no client-side timeout. This KIP
wouldn't fix that problem or any problems caused by lack of connectivity
since this would only apply to in flight requests, which by definition
must
have been sent on an active connection.

I suspect both types of problems probably need to be addressed separately
by introducing explicit timeouts. However, because the settings introduced
here are very much about the internal implementations of the clients, I'm
wondering if this even needs to be a user-facing setting, especially if we
have to add other timeouts anyway. For example, would a fixed, generous
value that's still much shorter than a TCP timeout, say 15s, be good
enough? If other timeouts would allow, for example, the clients to
properly
exit even if requests have not hit their timeout, then what's the benefit
of being able to configure the request-level timeout?
That is a very good point. We have three places that we might be able to
enforce timeout for a message send:
1. Before append to accumulator - handled by metadata timeout on per
message level.
2. Batch of messages inside accumulator - no timeout mechanism now.
3. Request of batches after messages leave the accumulator - we have a
broker side timeout but no client side timeout for now.
My current proposal only address (3) but not (2).
Honestly I do not have a very clear idea about what should we do with (2)
right now. But I am with you that we should not expose too many
configurations to users. What I am thinking now to handle (2) is when user
call send, if we know that a partition is offline, we should throw
exception immediately instead of putting it into accumulator. This would
protect further memory consumption. We might also want to fail all the
batches in the dequeue once we found a partition is offline.  That said, I
feel timeout might not be quite applicable to (2).
Do you have any suggestion on this?

I know we have a similar setting, max.in.flights.requests.per.connection,
exposed publicly (which I just discovered is missing from the new producer
configs documentation). But it looks like the new consumer is not exposing
that option, using a fixed value instead. I think we should default to
hiding these implementation values unless there's a strong case for a
scenario that requires customization.
For producer, max.in.flight.requests.per.connection really matters. If
people do not want to have reorder of messages, they have to use
max.in.flight.requests.per.connection=1. On the other hand, if throughput
is more of a concern, it could be set to higher. For the new consumer, I
checked the value and I am not sure if the hard coded
max.in.flight.requests.per.connection=100 is the right value. Without the
response to the previous request, what offsets should be put into the next
fetch request? It seems to me the value will be one natively regardless of
the setting unless we are sending fetch request to different partitions,
which does not look like the case.
Anyway, it looks to be a separate issue orthogonal to the request timeout.

In other words, since the only user-facing change was the addition of the
setting, I'm wondering if we can avoid the KIP altogether by just choosing
a good default value for the timeout.
The problem is that we have a server side request timeout exposed as a
public configuration. We cannot set the client timeout smaller than that
value, so a hard coded value probably won¹t work here.

-Ewen

On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi,

 I just created a KIP to add a request timeout to NetworkClient for new
 Kafka clients.

 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+
timeout+to+NetworkClient

 Comments and suggestions are welcome!

 Thanks.

 Jiangjie (Becket) Qin




-- 
Thanks,
Ewen



[DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-04-13 Thread Jiangjie Qin
Hi,

I just created a KIP to add a request timeout to NetworkClient for new Kafka 
clients.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient

Comments and suggestions are welcome!

Thanks.

Jiangjie (Becket) Qin