Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Quick follow-up on the discussion on KIP-19. For partitionsFor() I think the question was whether to use max.enqueue.block.ms or request.timeout.ms to control the timeout. The proposed plan was to use request.timeout.ms. Alternately we could change the config max.enqueue.block.ms to max.block.ms and use that instead. The argument for this is that in both cases you are configuring the time the client will block. I think this is sort of a corner case so I am +1 either way. -Jay On Tue, Jun 2, 2015 at 10:07 AM, Ewen Cheslack-Postava e...@confluent.io wrote: Option 3 seems a lot better than previous options, especially from the user's perspective. I think it gives reasonable balance between control and fewer options, and the only implementation details it's exposing are that there is a buffer and there is a network request. Making the request timeout only start after enqueuing still allows you to compute a maximum timeout for a request by adding the two values, but doesn't have annoying artifacts like sometimes issuing a network request when there's only a fraction of a millisecond left for it to complete. REQUEST_TIMEOUT_DOC could probably add something about the retries, e.g. something like This timeout is per retry, so the maximum time spent waiting for a request to complete will be (retries+1) * network.request.timeout.ms. There's also one other use of the metadata fetch timeout in partitionsFor. Are we converting that to use MAX_ENQUEUE_TIMEOUT_MS_CONFIG? The naming is a bit awkward, but we need to use something there. Finally, just a nit, but the naming conventions for variables are getting inconsistent. Some have _MS in them, some don't, and some of the _DOC names are inconsistent with the _CONFIG names. -Ewen On Mon, Jun 1, 2015 at 9:44 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Bump up this thread. After several discussions in LinkedIn, we came up with three options. I have updated the KIP-19 wiki page to summarize the three options and stated our preference. We can discuss on them in tomorrow’s KIP hangout. Please let us know what do you think. Thanks, Jiangjie (Becket) Qin On 5/21/15, 5:54 PM, Jiangjie Qin j...@linkedin.com wrote: Based on the discussion we have, I just updated the KIP with the following proposal and want to see if there is further comments. The proposal is to have the following four timeout as end state. 1. max.buffer.full.block.ms - To replace block.on.buffer.full. The max time to block when buffer is full. 2. metadata.fetch.timeout.ms - reuse metadata timeout as batch.timeout.ms because it is essentially metadata not available. 3. replication.timeout.ms - It defines how long a server will wait for the records to be replicated to followers. 4. network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response. With the above approach, we can achieve the following. * We can have bounded blocking time for send() = (1) + (2). * The time after send() until response got received is generally bounded by linger.ms + (2) + (4), not taking retries into consideration. So from user’s perspective. Send() depends on metadata of a topic and buffer space. I am not sure if user would really care about how long it takes to receive the response because it is async anyway and we have so many things to consider (retries, linger.ms, retry backoff time, request timeout, etc). I think these configurations are clear enough to let user understand at the first glance. Please let me know what do you think. Thanks. Jiangjie (Becket) Qin On 5/20/15, 9:55 AM, Joel Koshy jjkosh...@gmail.com wrote: The fact that I understand the producer internals and am still struggling to understand the implications of the different settings, how I would set them, and how they potentially interact such that I could set invalid combinations seems like a red flag to me... Being able to say I want produce requests to timeout in 5s shouldn't require adjusting 3 or 4 configs if the defaults would normally timeout out in something like 30s. Setting aside compatibility issues and focusing on the best set of configs, I agree with Jay that there are two things I actually want out of the API. The key thing is a per-request timeout, which should be enforced client side. I would just expect this to follow the request through any internals so it can be enforced no matter where in the pipeline the request is. Within each component in the pipeline we might have to compute how much time we have left for the request in order to create a timeout within that setting. The second setting is to bound the amount of time spent blocking on send(). This is really an implementation detail, but one that people are
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Thanks Jay. max.block.ms looks good. I will update the wiki page. Jiangjie (Becket) Qin On 6/2/15, 11:26 AM, Jay Kreps jay.kr...@gmail.com wrote: Quick follow-up on the discussion on KIP-19. For partitionsFor() I think the question was whether to use max.enqueue.block.ms or request.timeout.ms to control the timeout. The proposed plan was to use request.timeout.ms. Alternately we could change the config max.enqueue.block.ms to max.block.ms and use that instead. The argument for this is that in both cases you are configuring the time the client will block. I think this is sort of a corner case so I am +1 either way. -Jay On Tue, Jun 2, 2015 at 10:07 AM, Ewen Cheslack-Postava e...@confluent.io wrote: Option 3 seems a lot better than previous options, especially from the user's perspective. I think it gives reasonable balance between control and fewer options, and the only implementation details it's exposing are that there is a buffer and there is a network request. Making the request timeout only start after enqueuing still allows you to compute a maximum timeout for a request by adding the two values, but doesn't have annoying artifacts like sometimes issuing a network request when there's only a fraction of a millisecond left for it to complete. REQUEST_TIMEOUT_DOC could probably add something about the retries, e.g. something like This timeout is per retry, so the maximum time spent waiting for a request to complete will be (retries+1) * network.request.timeout.ms. There's also one other use of the metadata fetch timeout in partitionsFor. Are we converting that to use MAX_ENQUEUE_TIMEOUT_MS_CONFIG? The naming is a bit awkward, but we need to use something there. Finally, just a nit, but the naming conventions for variables are getting inconsistent. Some have _MS in them, some don't, and some of the _DOC names are inconsistent with the _CONFIG names. -Ewen On Mon, Jun 1, 2015 at 9:44 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Bump up this thread. After several discussions in LinkedIn, we came up with three options. I have updated the KIP-19 wiki page to summarize the three options and stated our preference. We can discuss on them in tomorrow’s KIP hangout. Please let us know what do you think. Thanks, Jiangjie (Becket) Qin On 5/21/15, 5:54 PM, Jiangjie Qin j...@linkedin.com wrote: Based on the discussion we have, I just updated the KIP with the following proposal and want to see if there is further comments. The proposal is to have the following four timeout as end state. 1. max.buffer.full.block.ms - To replace block.on.buffer.full. The max time to block when buffer is full. 2. metadata.fetch.timeout.ms - reuse metadata timeout as batch.timeout.ms because it is essentially metadata not available. 3. replication.timeout.ms - It defines how long a server will wait for the records to be replicated to followers. 4. network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response. With the above approach, we can achieve the following. * We can have bounded blocking time for send() = (1) + (2). * The time after send() until response got received is generally bounded by linger.ms + (2) + (4), not taking retries into consideration. So from user’s perspective. Send() depends on metadata of a topic and buffer space. I am not sure if user would really care about how long it takes to receive the response because it is async anyway and we have so many things to consider (retries, linger.ms, retry backoff time, request timeout, etc). I think these configurations are clear enough to let user understand at the first glance. Please let me know what do you think. Thanks. Jiangjie (Becket) Qin On 5/20/15, 9:55 AM, Joel Koshy jjkosh...@gmail.com wrote: The fact that I understand the producer internals and am still struggling to understand the implications of the different settings, how I would set them, and how they potentially interact such that I could set invalid combinations seems like a red flag to me... Being able to say I want produce requests to timeout in 5s shouldn't require adjusting 3 or 4 configs if the defaults would normally timeout out in something like 30s. Setting aside compatibility issues and focusing on the best set of configs, I agree with Jay that there are two things I actually want out of the API. The key thing is a per-request timeout, which should be enforced client side. I would just expect this to follow the request through any internals so it can be enforced no matter where in the pipeline the request is. Within each component in the pipeline we might have to compute how much time we have left for the request in order to create a timeout within that setting.
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Option 3 seems a lot better than previous options, especially from the user's perspective. I think it gives reasonable balance between control and fewer options, and the only implementation details it's exposing are that there is a buffer and there is a network request. Making the request timeout only start after enqueuing still allows you to compute a maximum timeout for a request by adding the two values, but doesn't have annoying artifacts like sometimes issuing a network request when there's only a fraction of a millisecond left for it to complete. REQUEST_TIMEOUT_DOC could probably add something about the retries, e.g. something like This timeout is per retry, so the maximum time spent waiting for a request to complete will be (retries+1) * network.request.timeout.ms. There's also one other use of the metadata fetch timeout in partitionsFor. Are we converting that to use MAX_ENQUEUE_TIMEOUT_MS_CONFIG? The naming is a bit awkward, but we need to use something there. Finally, just a nit, but the naming conventions for variables are getting inconsistent. Some have _MS in them, some don't, and some of the _DOC names are inconsistent with the _CONFIG names. -Ewen On Mon, Jun 1, 2015 at 9:44 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Bump up this thread. After several discussions in LinkedIn, we came up with three options. I have updated the KIP-19 wiki page to summarize the three options and stated our preference. We can discuss on them in tomorrow’s KIP hangout. Please let us know what do you think. Thanks, Jiangjie (Becket) Qin On 5/21/15, 5:54 PM, Jiangjie Qin j...@linkedin.com wrote: Based on the discussion we have, I just updated the KIP with the following proposal and want to see if there is further comments. The proposal is to have the following four timeout as end state. 1. max.buffer.full.block.ms - To replace block.on.buffer.full. The max time to block when buffer is full. 2. metadata.fetch.timeout.ms - reuse metadata timeout as batch.timeout.ms because it is essentially metadata not available. 3. replication.timeout.ms - It defines how long a server will wait for the records to be replicated to followers. 4. network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response. With the above approach, we can achieve the following. * We can have bounded blocking time for send() = (1) + (2). * The time after send() until response got received is generally bounded by linger.ms + (2) + (4), not taking retries into consideration. So from user’s perspective. Send() depends on metadata of a topic and buffer space. I am not sure if user would really care about how long it takes to receive the response because it is async anyway and we have so many things to consider (retries, linger.ms, retry backoff time, request timeout, etc). I think these configurations are clear enough to let user understand at the first glance. Please let me know what do you think. Thanks. Jiangjie (Becket) Qin On 5/20/15, 9:55 AM, Joel Koshy jjkosh...@gmail.com wrote: The fact that I understand the producer internals and am still struggling to understand the implications of the different settings, how I would set them, and how they potentially interact such that I could set invalid combinations seems like a red flag to me... Being able to say I want produce requests to timeout in 5s shouldn't require adjusting 3 or 4 configs if the defaults would normally timeout out in something like 30s. Setting aside compatibility issues and focusing on the best set of configs, I agree with Jay that there are two things I actually want out of the API. The key thing is a per-request timeout, which should be enforced client side. I would just expect this to follow the request through any internals so it can be enforced no matter where in the pipeline the request is. Within each component in the pipeline we might have to compute how much time we have left for the request in order to create a timeout within that setting. The second setting is to bound the amount of time spent blocking on send(). This is really an implementation detail, but one that people are complaining about enough that it seems worthwhile to provide control over it (and fixing it would just make that setting superfluous, not break anything). Exposing a lot more settings also exposes a lot about the implementation and makes it harder to improve the implementation in the future, but I don't think we have listed good use cases for setting each of them individually. Why would the user specifically care about how much time the request spends in the accumulator vs. some other component (assuming they have the overall timeout)? Same for requests in flight, as long as I have that client side timeout? And if they care about what
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Bump up this thread. After several discussions in LinkedIn, we came up with three options. I have updated the KIP-19 wiki page to summarize the three options and stated our preference. We can discuss on them in tomorrow’s KIP hangout. Please let us know what do you think. Thanks, Jiangjie (Becket) Qin On 5/21/15, 5:54 PM, Jiangjie Qin j...@linkedin.com wrote: Based on the discussion we have, I just updated the KIP with the following proposal and want to see if there is further comments. The proposal is to have the following four timeout as end state. 1. max.buffer.full.block.ms - To replace block.on.buffer.full. The max time to block when buffer is full. 2. metadata.fetch.timeout.ms - reuse metadata timeout as batch.timeout.ms because it is essentially metadata not available. 3. replication.timeout.ms - It defines how long a server will wait for the records to be replicated to followers. 4. network.request.timeout.ms - This timeout is used when producer sends request to brokers through TCP connections. It specifies how long the producer should wait for the response. With the above approach, we can achieve the following. * We can have bounded blocking time for send() = (1) + (2). * The time after send() until response got received is generally bounded by linger.ms + (2) + (4), not taking retries into consideration. So from user’s perspective. Send() depends on metadata of a topic and buffer space. I am not sure if user would really care about how long it takes to receive the response because it is async anyway and we have so many things to consider (retries, linger.ms, retry backoff time, request timeout, etc). I think these configurations are clear enough to let user understand at the first glance. Please let me know what do you think. Thanks. Jiangjie (Becket) Qin On 5/20/15, 9:55 AM, Joel Koshy jjkosh...@gmail.com wrote: The fact that I understand the producer internals and am still struggling to understand the implications of the different settings, how I would set them, and how they potentially interact such that I could set invalid combinations seems like a red flag to me... Being able to say I want produce requests to timeout in 5s shouldn't require adjusting 3 or 4 configs if the defaults would normally timeout out in something like 30s. Setting aside compatibility issues and focusing on the best set of configs, I agree with Jay that there are two things I actually want out of the API. The key thing is a per-request timeout, which should be enforced client side. I would just expect this to follow the request through any internals so it can be enforced no matter where in the pipeline the request is. Within each component in the pipeline we might have to compute how much time we have left for the request in order to create a timeout within that setting. The second setting is to bound the amount of time spent blocking on send(). This is really an implementation detail, but one that people are complaining about enough that it seems worthwhile to provide control over it (and fixing it would just make that setting superfluous, not break anything). Exposing a lot more settings also exposes a lot about the implementation and makes it harder to improve the implementation in the future, but I don't think we have listed good use cases for setting each of them individually. Why would the user specifically care about how much time the request spends in the accumulator vs. some other component (assuming they have the overall timeout)? Same for requests in flight, as long as I have that client side timeout? And if they care about what component is the bottleneck, could that be better exposed by the exceptions that are returned rather than a ton of different settings? Agreed with the above. I'm also extremely wary of configs that are inherently unintuitive, or can interact to yield unintuitive behavior. OTOH I think it is okay if a config is categorized as advanced or if it requires deeper knowledge of the internals of the producer (or the configured system in general). i.e., as long as we think long and hard and agree on necessity (driven by clear use cases) before adding such configs. We should also consider how we can simplify or even eliminate existing configs. Re: requests in flight may be a good example: Becket had given a valid use-case i.e., support strict ordering. Maybe we can replace it with a enable.strict.ordering config which is clearer in intent and would internally ensure only one in-flight request per partition and default to a fixed in-flight requests (say, five or 10) if set to false. If we implement idempotence then we won't even need that. On Tue, May 19, 2015 at 7:13 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, I updated what I think int KIP wiki. Just a short summary here. Because we need timeout for: 1. Send() 2. Batches in accumulator 3. Requests in flight. That means we need to have at least three
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
The fact that I understand the producer internals and am still struggling to understand the implications of the different settings, how I would set them, and how they potentially interact such that I could set invalid combinations seems like a red flag to me... Being able to say I want produce requests to timeout in 5s shouldn't require adjusting 3 or 4 configs if the defaults would normally timeout out in something like 30s. Setting aside compatibility issues and focusing on the best set of configs, I agree with Jay that there are two things I actually want out of the API. The key thing is a per-request timeout, which should be enforced client side. I would just expect this to follow the request through any internals so it can be enforced no matter where in the pipeline the request is. Within each component in the pipeline we might have to compute how much time we have left for the request in order to create a timeout within that setting. The second setting is to bound the amount of time spent blocking on send(). This is really an implementation detail, but one that people are complaining about enough that it seems worthwhile to provide control over it (and fixing it would just make that setting superfluous, not break anything). Exposing a lot more settings also exposes a lot about the implementation and makes it harder to improve the implementation in the future, but I don't think we have listed good use cases for setting each of them individually. Why would the user specifically care about how much time the request spends in the accumulator vs. some other component (assuming they have the overall timeout)? Same for requests in flight, as long as I have that client side timeout? And if they care about what component is the bottleneck, could that be better exposed by the exceptions that are returned rather than a ton of different settings? Agreed with the above. I'm also extremely wary of configs that are inherently unintuitive, or can interact to yield unintuitive behavior. OTOH I think it is okay if a config is categorized as advanced or if it requires deeper knowledge of the internals of the producer (or the configured system in general). i.e., as long as we think long and hard and agree on necessity (driven by clear use cases) before adding such configs. We should also consider how we can simplify or even eliminate existing configs. Re: requests in flight may be a good example: Becket had given a valid use-case i.e., support strict ordering. Maybe we can replace it with a enable.strict.ordering config which is clearer in intent and would internally ensure only one in-flight request per partition and default to a fixed in-flight requests (say, five or 10) if set to false. If we implement idempotence then we won't even need that. On Tue, May 19, 2015 at 7:13 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, I updated what I think int KIP wiki. Just a short summary here. Because we need timeout for: 1. Send() 2. Batches in accumulator 3. Requests in flight. That means we need to have at least three configurations if we do not reuse configurations. I think we probably want to also separate the configurations for exception handling and SLA purposes as well. My understanding of the configurations we are discussing here is they are for exception handling but not for SLA purposes. It looks to me that exception handling is more component oriented while SLA is more of systematic tuning. What you suggested sounds more like to set configurations to meet a user defined SLA. I am not sure if this is the things we want to do here. Thanks. Jiangjie (Becket) Qin On 5/19/15, 5:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I think linger.ms remains separate, setting that is a performance optimization rather than failure handling thing. We should ideally sanity check this, though, in my proposal, since if they set linger.ms request.timeout then that won't work. It's true that in my proposal that the actual replication timeout we set on the request would be non-deterministic. However the flip side of that argument is that in the existing proposal the actual time until an acknowledgement is non-deterministic, right? So I think the argument I am trying to construct is that the two things the user cares about are the time to block and the time to ack and any other timeout we use internally is basically an implementation detail of ensuring this. Your point about the difference between batches and requests is a good one. I hadn't thought of that. So to make my proposal work we would need to do something like base the request time off the oldest batch. Let me think about the implications of that, it's definitely a problem. -Jay On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Jay, That is also a viable solution. I think
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hey Ewen, I agree with you that we should avoid any unnecessary configurations exposed to user. And the necessity is defined by use case. I also agree that the configurations should be named from users’ perspective and comply with the intuition - for example, like what Joel said, something like enable.strict.order. That is exactly why I think it makes sense to just use metadata timeout to expire the batches in accumulator without adding another configuration. I still have some concerns over having only one timeout configuration after messages are appended to accumulator. IMO, this will cause problem in some scenarios. What do we expect user to set that value? If a batch stayed in accumulator for a while and maybe only 1 second is left before timeout, are we going to send it or not? From what I can see, network request timeout is to make sure we have enough buffer to allow a broker respond a little bit slow but also be able to avoid sticking to a dead broker for too long. It should be independent of other settings. If we agree on this then that indicates we need to have a *stand-alone* network.request.timeout.ms If we really want to give user some kind of guarantee on blocking time of send() and minimize the configurations. I feel it makes more sense to have a timeout including time blocked on send() and time spent in accumulator. But this will again have a similar question what if send() blocked for some time and there is only 1 second left when we put the message into accumulator? Also it is not clear to me how would one configuration solve the timeouts we have are on different entities, specifically: 1. In send() - messages 2. In accumulator - batches 3. In NetworkClient - requests I think there are two ways to think about the configurations: 1. Set configuration by steps, and let user to specify how long they want to wait for each steps - I think this is are you and Jay are opposing to. And I agree. 2. Tell user what things are required for a message to be sent. And let them set on how long they are willing to wait on each of them. So we have metadata, buffer pool, request timeout, replication timeout. I prefer the second way. In this case we can have: A. Metadata.timeout.ms - used in send() and accumulator B. Blocking.on.buffer.full.ms - used in send() C. Network.request.timeout.ms - used in NetworkClient D. Replication.timeout.ms - used in broker A send() will block for at most A + B, a send-to-response time can be at most A + B + C + D. It is very clear to user what kind of things they are configuring. Arguably, we are exposing internal thing to user, but it is better than providing some ambiguity to user and later on we have to explain the internal details to user separately. Thanks. Jiangjie (Becket) Qin On 5/19/15, 9:53 PM, Ewen Cheslack-Postava e...@confluent.io wrote: The fact that I understand the producer internals and am still struggling to understand the implications of the different settings, how I would set them, and how they potentially interact such that I could set invalid combinations seems like a red flag to me... Being able to say I want produce requests to timeout in 5s shouldn't require adjusting 3 or 4 configs if the defaults would normally timeout out in something like 30s. Setting aside compatibility issues and focusing on the best set of configs, I agree with Jay that there are two things I actually want out of the API. The key thing is a per-request timeout, which should be enforced client side. I would just expect this to follow the request through any internals so it can be enforced no matter where in the pipeline the request is. Within each component in the pipeline we might have to compute how much time we have left for the request in order to create a timeout within that setting. The second setting is to bound the amount of time spent blocking on send(). This is really an implementation detail, but one that people are complaining about enough that it seems worthwhile to provide control over it (and fixing it would just make that setting superfluous, not break anything). Exposing a lot more settings also exposes a lot about the implementation and makes it harder to improve the implementation in the future, but I don't think we have listed good use cases for setting each of them individually. Why would the user specifically care about how much time the request spends in the accumulator vs. some other component (assuming they have the overall timeout)? Same for requests in flight, as long as I have that client side timeout? And if they care about what component is the bottleneck, could that be better exposed by the exceptions that are returned rather than a ton of different settings? On Tue, May 19, 2015 at 7:13 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, I updated what I think int KIP wiki. Just a short summary here. Because we need timeout for: 1. Send() 2. Batches in accumulator 3. Requests in flight. That means we need to have at
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
The fact that I understand the producer internals and am still struggling to understand the implications of the different settings, how I would set them, and how they potentially interact such that I could set invalid combinations seems like a red flag to me... Being able to say I want produce requests to timeout in 5s shouldn't require adjusting 3 or 4 configs if the defaults would normally timeout out in something like 30s. Setting aside compatibility issues and focusing on the best set of configs, I agree with Jay that there are two things I actually want out of the API. The key thing is a per-request timeout, which should be enforced client side. I would just expect this to follow the request through any internals so it can be enforced no matter where in the pipeline the request is. Within each component in the pipeline we might have to compute how much time we have left for the request in order to create a timeout within that setting. The second setting is to bound the amount of time spent blocking on send(). This is really an implementation detail, but one that people are complaining about enough that it seems worthwhile to provide control over it (and fixing it would just make that setting superfluous, not break anything). Exposing a lot more settings also exposes a lot about the implementation and makes it harder to improve the implementation in the future, but I don't think we have listed good use cases for setting each of them individually. Why would the user specifically care about how much time the request spends in the accumulator vs. some other component (assuming they have the overall timeout)? Same for requests in flight, as long as I have that client side timeout? And if they care about what component is the bottleneck, could that be better exposed by the exceptions that are returned rather than a ton of different settings? On Tue, May 19, 2015 at 7:13 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, I updated what I think int KIP wiki. Just a short summary here. Because we need timeout for: 1. Send() 2. Batches in accumulator 3. Requests in flight. That means we need to have at least three configurations if we do not reuse configurations. I think we probably want to also separate the configurations for exception handling and SLA purposes as well. My understanding of the configurations we are discussing here is they are for exception handling but not for SLA purposes. It looks to me that exception handling is more component oriented while SLA is more of systematic tuning. What you suggested sounds more like to set configurations to meet a user defined SLA. I am not sure if this is the things we want to do here. Thanks. Jiangjie (Becket) Qin On 5/19/15, 5:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I think linger.ms remains separate, setting that is a performance optimization rather than failure handling thing. We should ideally sanity check this, though, in my proposal, since if they set linger.ms request.timeout then that won't work. It's true that in my proposal that the actual replication timeout we set on the request would be non-deterministic. However the flip side of that argument is that in the existing proposal the actual time until an acknowledgement is non-deterministic, right? So I think the argument I am trying to construct is that the two things the user cares about are the time to block and the time to ack and any other timeout we use internally is basically an implementation detail of ensuring this. Your point about the difference between batches and requests is a good one. I hadn't thought of that. So to make my proposal work we would need to do something like base the request time off the oldest batch. Let me think about the implications of that, it's definitely a problem. -Jay On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Jay, That is also a viable solution. I think the main purpose is to let user know how long they can block, which is important. I have some question over the proposal, though. Will user still need to send linger.ms? Will request timeout cover linger.ms as well? My concern of letting request timeout also cover the time spent in accumulator is that this will result in the actually request timeout indeterministic. Also, implementation wise, a request can have multiple batches, the time spent in the accumulator could vary a lot. If one of the batch times out, what should we do the the rest of the batches? I think we probably want to separate batch timeout and request timeout. Maybe we can do this: Max.send.block.ms Request.timeout Batch.timeout Replication.timeout So in send() we use max.send.block.ms only. In accumulator, we use batch.timeout, in NetWorkClient, we use request.timeout. Replication timeout is needed anyway. This looks more understandable from what I can see. What do
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hey Mayuresh, I think our purpose is to find out the simplest user interface but still support all the flexibilities user might need. In that sense, we have to expose 1. linger.ms for batching purpose, 2. request timeout to support different RTT. 3. Some blocking timeout for send() Etc,.. So having one single timeout might not meet the flexibilities we want to provide to users. I’ll revise the wiki and maybe we can go from there. Thanks. Jiangjie (Becket) Qin On 5/19/15, 12:51 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: IMO, having 4 different timeouts makes it confusing for the user and it requires the client to understand the internals of kafka. We should have a single timeout from the users perspective and handle other timeouts internally like a batch timeout. Mayuresh On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Jay, That is also a viable solution. I think the main purpose is to let user know how long they can block, which is important. I have some question over the proposal, though. Will user still need to send linger.ms? Will request timeout cover linger.ms as well? My concern of letting request timeout also cover the time spent in accumulator is that this will result in the actually request timeout indeterministic. Also, implementation wise, a request can have multiple batches, the time spent in the accumulator could vary a lot. If one of the batch times out, what should we do the the rest of the batches? I think we probably want to separate batch timeout and request timeout. Maybe we can do this: Max.send.block.ms Request.timeout Batch.timeout Replication.timeout So in send() we use max.send.block.ms only. In accumulator, we use batch.timeout, in NetWorkClient, we use request.timeout. Replication timeout is needed anyway. This looks more understandable from what I can see. What do you think? Jiangjie (Becket) Qin On 5/19/15, 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote: So the alternative to consider would be to instead have max.block.ms (or something) request.timeout replication.timeout I think this better captures what the user cares about. Here is how it would work. *max.send.block.ms http://max.send.block.ms* is the bound on the maximum time the producer.send() call can block. This subsumes the existing metadata timeout use case but not the proposed use for the time in the accumulator. It *also* acts as a bound on the time you can block on BufferPool allocation (we'd have to add this but that should be easy). *request.timeout* is the bound on the time after send() complete until you get an acknowledgement. This covers the connection timeout, and the time in the accumulator. So to implement this, the time we set in the request sent via NetworkClient would have already subtracted off the time spent in the accumulator, and if the request retried we would include both the time in the accumulator an the time taken for the first request, etc. In other words this is the upper bound on the time to the Future being satisfied. *replication.timeout* will default to something reasonable but maybe you can override it if you want? Thoughts? -Jay On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: So what I understand is that, we would have 3 time outs : 1) replication timeout 2) request timeout 3) metadata timeout (existing) The request timeout has to be greater than the replication timeout. request timeout is for messages already sent to kafka and the producer is waiting for them. Thanks, Mayuresh On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I think this looks good. What I think is missing is an overview of the timeouts from the user's perspective. My worry is that it is quite complicated to reason about the current set of timeouts. Currently we have timeout.ms metadata.fetch.timeout.ms The proposed settings I think are: batch.expiration.ms request.timeout.ms replication.timeout.ms I think maybe we can skip the batch.expiration.ms. Instead maybe we can somehow combine these into a single request timeout so that we subtract the time you spent waiting from the request timeout and/or replication timeout somehow? I don't have an explicit proposal but my suspicion is that from the user's point of view there is just one timeout related to the request after which they don't care, and we can split that up between the batch time and the request time. Thoughts? How are we handling connection timeouts? If a machine hard fails in the middle of connection establishment there will be no outstanding requests. I think this may be okay because connections are established when we want to send a request and presumably we will begin the timer then? To that end I suggest we do two things:
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hi Jay, I updated what I think int KIP wiki. Just a short summary here. Because we need timeout for: 1. Send() 2. Batches in accumulator 3. Requests in flight. That means we need to have at least three configurations if we do not reuse configurations. I think we probably want to also separate the configurations for exception handling and SLA purposes as well. My understanding of the configurations we are discussing here is they are for exception handling but not for SLA purposes. It looks to me that exception handling is more component oriented while SLA is more of systematic tuning. What you suggested sounds more like to set configurations to meet a user defined SLA. I am not sure if this is the things we want to do here. Thanks. Jiangjie (Becket) Qin On 5/19/15, 5:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I think linger.ms remains separate, setting that is a performance optimization rather than failure handling thing. We should ideally sanity check this, though, in my proposal, since if they set linger.ms request.timeout then that won't work. It's true that in my proposal that the actual replication timeout we set on the request would be non-deterministic. However the flip side of that argument is that in the existing proposal the actual time until an acknowledgement is non-deterministic, right? So I think the argument I am trying to construct is that the two things the user cares about are the time to block and the time to ack and any other timeout we use internally is basically an implementation detail of ensuring this. Your point about the difference between batches and requests is a good one. I hadn't thought of that. So to make my proposal work we would need to do something like base the request time off the oldest batch. Let me think about the implications of that, it's definitely a problem. -Jay On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Jay, That is also a viable solution. I think the main purpose is to let user know how long they can block, which is important. I have some question over the proposal, though. Will user still need to send linger.ms? Will request timeout cover linger.ms as well? My concern of letting request timeout also cover the time spent in accumulator is that this will result in the actually request timeout indeterministic. Also, implementation wise, a request can have multiple batches, the time spent in the accumulator could vary a lot. If one of the batch times out, what should we do the the rest of the batches? I think we probably want to separate batch timeout and request timeout. Maybe we can do this: Max.send.block.ms Request.timeout Batch.timeout Replication.timeout So in send() we use max.send.block.ms only. In accumulator, we use batch.timeout, in NetWorkClient, we use request.timeout. Replication timeout is needed anyway. This looks more understandable from what I can see. What do you think? Jiangjie (Becket) Qin On 5/19/15, 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote: So the alternative to consider would be to instead have max.block.ms (or something) request.timeout replication.timeout I think this better captures what the user cares about. Here is how it would work. *max.send.block.ms http://max.send.block.ms* is the bound on the maximum time the producer.send() call can block. This subsumes the existing metadata timeout use case but not the proposed use for the time in the accumulator. It *also* acts as a bound on the time you can block on BufferPool allocation (we'd have to add this but that should be easy). *request.timeout* is the bound on the time after send() complete until you get an acknowledgement. This covers the connection timeout, and the time in the accumulator. So to implement this, the time we set in the request sent via NetworkClient would have already subtracted off the time spent in the accumulator, and if the request retried we would include both the time in the accumulator an the time taken for the first request, etc. In other words this is the upper bound on the time to the Future being satisfied. *replication.timeout* will default to something reasonable but maybe you can override it if you want? Thoughts? -Jay On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: So what I understand is that, we would have 3 time outs : 1) replication timeout 2) request timeout 3) metadata timeout (existing) The request timeout has to be greater than the replication timeout. request timeout is for messages already sent to kafka and the producer is waiting for them. Thanks, Mayuresh On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I think this looks good. What I think is missing is an overview of the timeouts from the user's perspective. My worry is that it is quite complicated to reason about the current set of
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hi Jay, If I am understanding this correctly, we should treat the Batch Timeout separately from the Request Timeout. Request time out can be used only for inflight request (request that have been sent to kafka brokers and waiting for response) and Batch Timeout can be used to expire the batches in accumalator and return a failure to the client. Mayuresh On Tue, May 19, 2015 at 5:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I think linger.ms remains separate, setting that is a performance optimization rather than failure handling thing. We should ideally sanity check this, though, in my proposal, since if they set linger.ms request.timeout then that won't work. It's true that in my proposal that the actual replication timeout we set on the request would be non-deterministic. However the flip side of that argument is that in the existing proposal the actual time until an acknowledgement is non-deterministic, right? So I think the argument I am trying to construct is that the two things the user cares about are the time to block and the time to ack and any other timeout we use internally is basically an implementation detail of ensuring this. Your point about the difference between batches and requests is a good one. I hadn't thought of that. So to make my proposal work we would need to do something like base the request time off the oldest batch. Let me think about the implications of that, it's definitely a problem. -Jay On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Jay, That is also a viable solution. I think the main purpose is to let user know how long they can block, which is important. I have some question over the proposal, though. Will user still need to send linger.ms? Will request timeout cover linger.ms as well? My concern of letting request timeout also cover the time spent in accumulator is that this will result in the actually request timeout indeterministic. Also, implementation wise, a request can have multiple batches, the time spent in the accumulator could vary a lot. If one of the batch times out, what should we do the the rest of the batches? I think we probably want to separate batch timeout and request timeout. Maybe we can do this: Max.send.block.ms Request.timeout Batch.timeout Replication.timeout So in send() we use max.send.block.ms only. In accumulator, we use batch.timeout, in NetWorkClient, we use request.timeout. Replication timeout is needed anyway. This looks more understandable from what I can see. What do you think? Jiangjie (Becket) Qin On 5/19/15, 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote: So the alternative to consider would be to instead have max.block.ms (or something) request.timeout replication.timeout I think this better captures what the user cares about. Here is how it would work. *max.send.block.ms http://max.send.block.ms* is the bound on the maximum time the producer.send() call can block. This subsumes the existing metadata timeout use case but not the proposed use for the time in the accumulator. It *also* acts as a bound on the time you can block on BufferPool allocation (we'd have to add this but that should be easy). *request.timeout* is the bound on the time after send() complete until you get an acknowledgement. This covers the connection timeout, and the time in the accumulator. So to implement this, the time we set in the request sent via NetworkClient would have already subtracted off the time spent in the accumulator, and if the request retried we would include both the time in the accumulator an the time taken for the first request, etc. In other words this is the upper bound on the time to the Future being satisfied. *replication.timeout* will default to something reasonable but maybe you can override it if you want? Thoughts? -Jay On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: So what I understand is that, we would have 3 time outs : 1) replication timeout 2) request timeout 3) metadata timeout (existing) The request timeout has to be greater than the replication timeout. request timeout is for messages already sent to kafka and the producer is waiting for them. Thanks, Mayuresh On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I think this looks good. What I think is missing is an overview of the timeouts from the user's perspective. My worry is that it is quite complicated to reason about the current set of timeouts. Currently we have timeout.ms metadata.fetch.timeout.ms The proposed settings I think are: batch.expiration.ms request.timeout.ms replication.timeout.ms I think maybe we can skip the batch.expiration.ms. Instead maybe we
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Yeah I think linger.ms remains separate, setting that is a performance optimization rather than failure handling thing. We should ideally sanity check this, though, in my proposal, since if they set linger.ms request.timeout then that won't work. It's true that in my proposal that the actual replication timeout we set on the request would be non-deterministic. However the flip side of that argument is that in the existing proposal the actual time until an acknowledgement is non-deterministic, right? So I think the argument I am trying to construct is that the two things the user cares about are the time to block and the time to ack and any other timeout we use internally is basically an implementation detail of ensuring this. Your point about the difference between batches and requests is a good one. I hadn't thought of that. So to make my proposal work we would need to do something like base the request time off the oldest batch. Let me think about the implications of that, it's definitely a problem. -Jay On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Jay, That is also a viable solution. I think the main purpose is to let user know how long they can block, which is important. I have some question over the proposal, though. Will user still need to send linger.ms? Will request timeout cover linger.ms as well? My concern of letting request timeout also cover the time spent in accumulator is that this will result in the actually request timeout indeterministic. Also, implementation wise, a request can have multiple batches, the time spent in the accumulator could vary a lot. If one of the batch times out, what should we do the the rest of the batches? I think we probably want to separate batch timeout and request timeout. Maybe we can do this: Max.send.block.ms Request.timeout Batch.timeout Replication.timeout So in send() we use max.send.block.ms only. In accumulator, we use batch.timeout, in NetWorkClient, we use request.timeout. Replication timeout is needed anyway. This looks more understandable from what I can see. What do you think? Jiangjie (Becket) Qin On 5/19/15, 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote: So the alternative to consider would be to instead have max.block.ms (or something) request.timeout replication.timeout I think this better captures what the user cares about. Here is how it would work. *max.send.block.ms http://max.send.block.ms* is the bound on the maximum time the producer.send() call can block. This subsumes the existing metadata timeout use case but not the proposed use for the time in the accumulator. It *also* acts as a bound on the time you can block on BufferPool allocation (we'd have to add this but that should be easy). *request.timeout* is the bound on the time after send() complete until you get an acknowledgement. This covers the connection timeout, and the time in the accumulator. So to implement this, the time we set in the request sent via NetworkClient would have already subtracted off the time spent in the accumulator, and if the request retried we would include both the time in the accumulator an the time taken for the first request, etc. In other words this is the upper bound on the time to the Future being satisfied. *replication.timeout* will default to something reasonable but maybe you can override it if you want? Thoughts? -Jay On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: So what I understand is that, we would have 3 time outs : 1) replication timeout 2) request timeout 3) metadata timeout (existing) The request timeout has to be greater than the replication timeout. request timeout is for messages already sent to kafka and the producer is waiting for them. Thanks, Mayuresh On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I think this looks good. What I think is missing is an overview of the timeouts from the user's perspective. My worry is that it is quite complicated to reason about the current set of timeouts. Currently we have timeout.ms metadata.fetch.timeout.ms The proposed settings I think are: batch.expiration.ms request.timeout.ms replication.timeout.ms I think maybe we can skip the batch.expiration.ms. Instead maybe we can somehow combine these into a single request timeout so that we subtract the time you spent waiting from the request timeout and/or replication timeout somehow? I don't have an explicit proposal but my suspicion is that from the user's point of view there is just one timeout related to the request after which they don't care, and we can split that up between the batch time and the request time. Thoughts? How are we handling connection timeouts? If a machine hard fails in the middle of connection establishment there
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
So what I understand is that, we would have 3 time outs : 1) replication timeout 2) request timeout 3) metadata timeout (existing) The request timeout has to be greater than the replication timeout. request timeout is for messages already sent to kafka and the producer is waiting for them. Thanks, Mayuresh On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I think this looks good. What I think is missing is an overview of the timeouts from the user's perspective. My worry is that it is quite complicated to reason about the current set of timeouts. Currently we have timeout.ms metadata.fetch.timeout.ms The proposed settings I think are: batch.expiration.ms request.timeout.ms replication.timeout.ms I think maybe we can skip the batch.expiration.ms. Instead maybe we can somehow combine these into a single request timeout so that we subtract the time you spent waiting from the request timeout and/or replication timeout somehow? I don't have an explicit proposal but my suspicion is that from the user's point of view there is just one timeout related to the request after which they don't care, and we can split that up between the batch time and the request time. Thoughts? How are we handling connection timeouts? If a machine hard fails in the middle of connection establishment there will be no outstanding requests. I think this may be okay because connections are established when we want to send a request and presumably we will begin the timer then? To that end I suggest we do two things: 1. Include KAKFA-1788. I know that technically these two things are different but from the user's point of view they aren't. 2. Include in the KIP the explanation to the user of the full set of timeouts, what they mean, how we will default them, and when to override which. I know this is a hassle but I think the end experience will be a lot better if we go through this thought process. -Jay On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I modified the WIKI page to incorporate the feedbacks from mailing list and KIP hangout. - Added the deprecation plan for TIMEOUT_CONFIG - Added the actions to take after request timeout I finally chose to create a new connection if requests timeout. The reason is: 1. In most cases, if a broker is just slow, as long as we set request timeout to be a reasonable value, we should not see many new connections get created. 2. If a broker is down, hopefully metadata refresh will find the new broker and we will not try to reconnect to the broker anymore. Comments are welcome! Thanks. Jiangjie (Becket) Qin On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: +1 Becket. That would give enough time for clients to move. We should make this change very clear. Thanks, Mayuresh On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Ewen, Very good summary about the compatibility. What you proposed makes sense. So basically we can do the following: In next release, i.e. 0.8.3: 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”) 2. Mark TIMEOUT_CONFIG as deprecated 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is defined and give a warning about deprecation. In the release after 0.8.3, we remove TIMEOUT_CONFIG. This should give enough buffer for this change. Request timeout is a complete new thing we add to fix a bug, I’m with you it does not make sense to have it maintain the old buggy behavior. So we can set it to a reasonable value instead of infinite. Jiangjie (Becket) Qin On 5/12/15, 12:03 PM, Ewen Cheslack-Postava e...@confluent.io wrote: I think my confusion is coming from this: So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). There are 3 possible compatibility issues I see here: * I assumed this meant the constants also change, so timeout.ms becomes replication.timeout.ms. This breaks config files that worked on the previous version and the only warning would be in release notes. We do warn about unused configs so they might notice the problem. * Binary and source compatibility if someone configures their client in code and uses the TIMEOUT_CONFIG variable. Renaming it will cause existing jars to break if you try to run against an updated client (which seems not very significant since I doubt people upgrade these without recompiling but maybe I'm wrong about that). And it breaks builds without have deprecated that field first, which again, is probably not the biggest issue but is annoying for users and when we accidentally changed the
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Here is the concern I had with reusing the metadata.fetch.timeout.ms: Previously people were using this as a bound on the time send() would block. It isn't a bound on the time we will wait on a metadata request, just the time the send call will block if metadata is missing for the topic. We told people who wanted a guarantee of no blocking to basically preinitialize metadata and set this timeout to 0. However I think now this will have a slightly different side effect which is to kill any request immediately for a leaderless partition even though that request is safely buffered in the record accumulator and no blocking will occur. People using the setting in the original way would now get a bit of a surprise. This may actually be okay and there is always a tradeoff between simplicity and control. -Jay On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I think this looks good. What I think is missing is an overview of the timeouts from the user's perspective. My worry is that it is quite complicated to reason about the current set of timeouts. Currently we have timeout.ms metadata.fetch.timeout.ms The proposed settings I think are: batch.expiration.ms request.timeout.ms replication.timeout.ms I think maybe we can skip the batch.expiration.ms. Instead maybe we can somehow combine these into a single request timeout so that we subtract the time you spent waiting from the request timeout and/or replication timeout somehow? I don't have an explicit proposal but my suspicion is that from the user's point of view there is just one timeout related to the request after which they don't care, and we can split that up between the batch time and the request time. Thoughts? How are we handling connection timeouts? If a machine hard fails in the middle of connection establishment there will be no outstanding requests. I think this may be okay because connections are established when we want to send a request and presumably we will begin the timer then? To that end I suggest we do two things: 1. Include KAKFA-1788. I know that technically these two things are different but from the user's point of view they aren't. 2. Include in the KIP the explanation to the user of the full set of timeouts, what they mean, how we will default them, and when to override which. I know this is a hassle but I think the end experience will be a lot better if we go through this thought process. -Jay On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I modified the WIKI page to incorporate the feedbacks from mailing list and KIP hangout. - Added the deprecation plan for TIMEOUT_CONFIG - Added the actions to take after request timeout I finally chose to create a new connection if requests timeout. The reason is: 1. In most cases, if a broker is just slow, as long as we set request timeout to be a reasonable value, we should not see many new connections get created. 2. If a broker is down, hopefully metadata refresh will find the new broker and we will not try to reconnect to the broker anymore. Comments are welcome! Thanks. Jiangjie (Becket) Qin On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: +1 Becket. That would give enough time for clients to move. We should make this change very clear. Thanks, Mayuresh On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Ewen, Very good summary about the compatibility. What you proposed makes sense. So basically we can do the following: In next release, i.e. 0.8.3: 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”) 2. Mark TIMEOUT_CONFIG as deprecated 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is defined and give a warning about deprecation. In the release after 0.8.3, we remove TIMEOUT_CONFIG. This should give enough buffer for this change. Request timeout is a complete new thing we add to fix a bug, I’m with you it does not make sense to have it maintain the old buggy behavior. So we can set it to a reasonable value instead of infinite. Jiangjie (Becket) Qin On 5/12/15, 12:03 PM, Ewen Cheslack-Postava e...@confluent.io wrote: I think my confusion is coming from this: So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). There are 3 possible compatibility issues I see here: * I assumed this meant the constants also change, so timeout.ms becomes replication.timeout.ms. This breaks config files that worked on the previous version and the only warning would be in release notes. We do warn about unused configs so they might notice the problem. * Binary and source compatibility if someone configures their client in code and uses the
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
So the alternative to consider would be to instead have max.block.ms (or something) request.timeout replication.timeout I think this better captures what the user cares about. Here is how it would work. *max.send.block.ms http://max.send.block.ms* is the bound on the maximum time the producer.send() call can block. This subsumes the existing metadata timeout use case but not the proposed use for the time in the accumulator. It *also* acts as a bound on the time you can block on BufferPool allocation (we'd have to add this but that should be easy). *request.timeout* is the bound on the time after send() complete until you get an acknowledgement. This covers the connection timeout, and the time in the accumulator. So to implement this, the time we set in the request sent via NetworkClient would have already subtracted off the time spent in the accumulator, and if the request retried we would include both the time in the accumulator an the time taken for the first request, etc. In other words this is the upper bound on the time to the Future being satisfied. *replication.timeout* will default to something reasonable but maybe you can override it if you want? Thoughts? -Jay On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: So what I understand is that, we would have 3 time outs : 1) replication timeout 2) request timeout 3) metadata timeout (existing) The request timeout has to be greater than the replication timeout. request timeout is for messages already sent to kafka and the producer is waiting for them. Thanks, Mayuresh On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I think this looks good. What I think is missing is an overview of the timeouts from the user's perspective. My worry is that it is quite complicated to reason about the current set of timeouts. Currently we have timeout.ms metadata.fetch.timeout.ms The proposed settings I think are: batch.expiration.ms request.timeout.ms replication.timeout.ms I think maybe we can skip the batch.expiration.ms. Instead maybe we can somehow combine these into a single request timeout so that we subtract the time you spent waiting from the request timeout and/or replication timeout somehow? I don't have an explicit proposal but my suspicion is that from the user's point of view there is just one timeout related to the request after which they don't care, and we can split that up between the batch time and the request time. Thoughts? How are we handling connection timeouts? If a machine hard fails in the middle of connection establishment there will be no outstanding requests. I think this may be okay because connections are established when we want to send a request and presumably we will begin the timer then? To that end I suggest we do two things: 1. Include KAKFA-1788. I know that technically these two things are different but from the user's point of view they aren't. 2. Include in the KIP the explanation to the user of the full set of timeouts, what they mean, how we will default them, and when to override which. I know this is a hassle but I think the end experience will be a lot better if we go through this thought process. -Jay On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I modified the WIKI page to incorporate the feedbacks from mailing list and KIP hangout. - Added the deprecation plan for TIMEOUT_CONFIG - Added the actions to take after request timeout I finally chose to create a new connection if requests timeout. The reason is: 1. In most cases, if a broker is just slow, as long as we set request timeout to be a reasonable value, we should not see many new connections get created. 2. If a broker is down, hopefully metadata refresh will find the new broker and we will not try to reconnect to the broker anymore. Comments are welcome! Thanks. Jiangjie (Becket) Qin On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: +1 Becket. That would give enough time for clients to move. We should make this change very clear. Thanks, Mayuresh On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Ewen, Very good summary about the compatibility. What you proposed makes sense. So basically we can do the following: In next release, i.e. 0.8.3: 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”) 2. Mark TIMEOUT_CONFIG as deprecated 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is defined and give a warning about deprecation. In the release after 0.8.3, we remove TIMEOUT_CONFIG. This should give enough buffer for this change. Request timeout is a complete new thing we add to fix a bug, I’m with you
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hey Jay, That is also a viable solution. I think the main purpose is to let user know how long they can block, which is important. I have some question over the proposal, though. Will user still need to send linger.ms? Will request timeout cover linger.ms as well? My concern of letting request timeout also cover the time spent in accumulator is that this will result in the actually request timeout indeterministic. Also, implementation wise, a request can have multiple batches, the time spent in the accumulator could vary a lot. If one of the batch times out, what should we do the the rest of the batches? I think we probably want to separate batch timeout and request timeout. Maybe we can do this: Max.send.block.ms Request.timeout Batch.timeout Replication.timeout So in send() we use max.send.block.ms only. In accumulator, we use batch.timeout, in NetWorkClient, we use request.timeout. Replication timeout is needed anyway. This looks more understandable from what I can see. What do you think? Jiangjie (Becket) Qin On 5/19/15, 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote: So the alternative to consider would be to instead have max.block.ms (or something) request.timeout replication.timeout I think this better captures what the user cares about. Here is how it would work. *max.send.block.ms http://max.send.block.ms* is the bound on the maximum time the producer.send() call can block. This subsumes the existing metadata timeout use case but not the proposed use for the time in the accumulator. It *also* acts as a bound on the time you can block on BufferPool allocation (we'd have to add this but that should be easy). *request.timeout* is the bound on the time after send() complete until you get an acknowledgement. This covers the connection timeout, and the time in the accumulator. So to implement this, the time we set in the request sent via NetworkClient would have already subtracted off the time spent in the accumulator, and if the request retried we would include both the time in the accumulator an the time taken for the first request, etc. In other words this is the upper bound on the time to the Future being satisfied. *replication.timeout* will default to something reasonable but maybe you can override it if you want? Thoughts? -Jay On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: So what I understand is that, we would have 3 time outs : 1) replication timeout 2) request timeout 3) metadata timeout (existing) The request timeout has to be greater than the replication timeout. request timeout is for messages already sent to kafka and the producer is waiting for them. Thanks, Mayuresh On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I think this looks good. What I think is missing is an overview of the timeouts from the user's perspective. My worry is that it is quite complicated to reason about the current set of timeouts. Currently we have timeout.ms metadata.fetch.timeout.ms The proposed settings I think are: batch.expiration.ms request.timeout.ms replication.timeout.ms I think maybe we can skip the batch.expiration.ms. Instead maybe we can somehow combine these into a single request timeout so that we subtract the time you spent waiting from the request timeout and/or replication timeout somehow? I don't have an explicit proposal but my suspicion is that from the user's point of view there is just one timeout related to the request after which they don't care, and we can split that up between the batch time and the request time. Thoughts? How are we handling connection timeouts? If a machine hard fails in the middle of connection establishment there will be no outstanding requests. I think this may be okay because connections are established when we want to send a request and presumably we will begin the timer then? To that end I suggest we do two things: 1. Include KAKFA-1788. I know that technically these two things are different but from the user's point of view they aren't. 2. Include in the KIP the explanation to the user of the full set of timeouts, what they mean, how we will default them, and when to override which. I know this is a hassle but I think the end experience will be a lot better if we go through this thought process. -Jay On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I modified the WIKI page to incorporate the feedbacks from mailing list and KIP hangout. - Added the deprecation plan for TIMEOUT_CONFIG - Added the actions to take after request timeout I finally chose to create a new connection if requests timeout. The reason is: 1. In most cases, if a broker is just slow, as long as we set request timeout to be a reasonable value, we should not see many new connections get created. 2. If a broker is down, hopefully
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
IMO, having 4 different timeouts makes it confusing for the user and it requires the client to understand the internals of kafka. We should have a single timeout from the users perspective and handle other timeouts internally like a batch timeout. Mayuresh On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Jay, That is also a viable solution. I think the main purpose is to let user know how long they can block, which is important. I have some question over the proposal, though. Will user still need to send linger.ms? Will request timeout cover linger.ms as well? My concern of letting request timeout also cover the time spent in accumulator is that this will result in the actually request timeout indeterministic. Also, implementation wise, a request can have multiple batches, the time spent in the accumulator could vary a lot. If one of the batch times out, what should we do the the rest of the batches? I think we probably want to separate batch timeout and request timeout. Maybe we can do this: Max.send.block.ms Request.timeout Batch.timeout Replication.timeout So in send() we use max.send.block.ms only. In accumulator, we use batch.timeout, in NetWorkClient, we use request.timeout. Replication timeout is needed anyway. This looks more understandable from what I can see. What do you think? Jiangjie (Becket) Qin On 5/19/15, 11:48 AM, Jay Kreps jay.kr...@gmail.com wrote: So the alternative to consider would be to instead have max.block.ms (or something) request.timeout replication.timeout I think this better captures what the user cares about. Here is how it would work. *max.send.block.ms http://max.send.block.ms* is the bound on the maximum time the producer.send() call can block. This subsumes the existing metadata timeout use case but not the proposed use for the time in the accumulator. It *also* acts as a bound on the time you can block on BufferPool allocation (we'd have to add this but that should be easy). *request.timeout* is the bound on the time after send() complete until you get an acknowledgement. This covers the connection timeout, and the time in the accumulator. So to implement this, the time we set in the request sent via NetworkClient would have already subtracted off the time spent in the accumulator, and if the request retried we would include both the time in the accumulator an the time taken for the first request, etc. In other words this is the upper bound on the time to the Future being satisfied. *replication.timeout* will default to something reasonable but maybe you can override it if you want? Thoughts? -Jay On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat gharatmayures...@gmail.com wrote: So what I understand is that, we would have 3 time outs : 1) replication timeout 2) request timeout 3) metadata timeout (existing) The request timeout has to be greater than the replication timeout. request timeout is for messages already sent to kafka and the producer is waiting for them. Thanks, Mayuresh On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I think this looks good. What I think is missing is an overview of the timeouts from the user's perspective. My worry is that it is quite complicated to reason about the current set of timeouts. Currently we have timeout.ms metadata.fetch.timeout.ms The proposed settings I think are: batch.expiration.ms request.timeout.ms replication.timeout.ms I think maybe we can skip the batch.expiration.ms. Instead maybe we can somehow combine these into a single request timeout so that we subtract the time you spent waiting from the request timeout and/or replication timeout somehow? I don't have an explicit proposal but my suspicion is that from the user's point of view there is just one timeout related to the request after which they don't care, and we can split that up between the batch time and the request time. Thoughts? How are we handling connection timeouts? If a machine hard fails in the middle of connection establishment there will be no outstanding requests. I think this may be okay because connections are established when we want to send a request and presumably we will begin the timer then? To that end I suggest we do two things: 1. Include KAKFA-1788. I know that technically these two things are different but from the user's point of view they aren't. 2. Include in the KIP the explanation to the user of the full set of timeouts, what they mean, how we will default them, and when to override which. I know this is a hassle but I think the end experience will be a lot better if we go through this thought process. -Jay On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I modified the
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
I think this looks good. What I think is missing is an overview of the timeouts from the user's perspective. My worry is that it is quite complicated to reason about the current set of timeouts. Currently we have timeout.ms metadata.fetch.timeout.ms The proposed settings I think are: batch.expiration.ms request.timeout.ms replication.timeout.ms I think maybe we can skip the batch.expiration.ms. Instead maybe we can somehow combine these into a single request timeout so that we subtract the time you spent waiting from the request timeout and/or replication timeout somehow? I don't have an explicit proposal but my suspicion is that from the user's point of view there is just one timeout related to the request after which they don't care, and we can split that up between the batch time and the request time. Thoughts? How are we handling connection timeouts? If a machine hard fails in the middle of connection establishment there will be no outstanding requests. I think this may be okay because connections are established when we want to send a request and presumably we will begin the timer then? To that end I suggest we do two things: 1. Include KAKFA-1788. I know that technically these two things are different but from the user's point of view they aren't. 2. Include in the KIP the explanation to the user of the full set of timeouts, what they mean, how we will default them, and when to override which. I know this is a hassle but I think the end experience will be a lot better if we go through this thought process. -Jay On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I modified the WIKI page to incorporate the feedbacks from mailing list and KIP hangout. - Added the deprecation plan for TIMEOUT_CONFIG - Added the actions to take after request timeout I finally chose to create a new connection if requests timeout. The reason is: 1. In most cases, if a broker is just slow, as long as we set request timeout to be a reasonable value, we should not see many new connections get created. 2. If a broker is down, hopefully metadata refresh will find the new broker and we will not try to reconnect to the broker anymore. Comments are welcome! Thanks. Jiangjie (Becket) Qin On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: +1 Becket. That would give enough time for clients to move. We should make this change very clear. Thanks, Mayuresh On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Ewen, Very good summary about the compatibility. What you proposed makes sense. So basically we can do the following: In next release, i.e. 0.8.3: 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”) 2. Mark TIMEOUT_CONFIG as deprecated 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is defined and give a warning about deprecation. In the release after 0.8.3, we remove TIMEOUT_CONFIG. This should give enough buffer for this change. Request timeout is a complete new thing we add to fix a bug, I’m with you it does not make sense to have it maintain the old buggy behavior. So we can set it to a reasonable value instead of infinite. Jiangjie (Becket) Qin On 5/12/15, 12:03 PM, Ewen Cheslack-Postava e...@confluent.io wrote: I think my confusion is coming from this: So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). There are 3 possible compatibility issues I see here: * I assumed this meant the constants also change, so timeout.ms becomes replication.timeout.ms. This breaks config files that worked on the previous version and the only warning would be in release notes. We do warn about unused configs so they might notice the problem. * Binary and source compatibility if someone configures their client in code and uses the TIMEOUT_CONFIG variable. Renaming it will cause existing jars to break if you try to run against an updated client (which seems not very significant since I doubt people upgrade these without recompiling but maybe I'm wrong about that). And it breaks builds without have deprecated that field first, which again, is probably not the biggest issue but is annoying for users and when we accidentally changed the API we received a complaint about breaking builds. * Behavior compatibility as Jay mentioned on the call -- setting the config (even if the name changed) doesn't have the same effect it used to. One solution, which admittedly is more painful to implement and maintain, would be to maintain the timeout.ms config, have it override the others if it is specified (including an infinite request timeout I guess?), and if it isn't specified, we can just use the new config variables. Given a
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hey Jay, I think that is a very reasonable concern. So the current behavior for those users are: 1. Send() will go through as long as metadata is available. 2. Send() will throw exception if the metadata of a partition is lost after pre-initialization. 3. The messages in accumulator will not be failed but wait until the partition metadata is available again. If we reuse metadata timeout, (1) and (2) are kept the same. Only (3) is changed as those messages will be failed immediately when the batch is ready. It is probably not an issue though, because user will get exception from send() call anyway in this case. It is probably better to fail the messages in accumulator than keeping them in that case because I really cannot think of any case where metadata of a partition can disappear and come up again shortly. So I guess the metadata timeout does exactly what it means - how long you are willing to wait for metadata. It is not designed to provide a blocking boundary for send() - we have blocking on buffer full as well. It is just one of the dependencies in send() so send() could be blocked for metadata timeout. I totally agree we should explain all the timeouts clearly. I think we are fine as long as we make sure the configuration is used for what it sounds to be used and articulate the impacts of those configurations. I’ll check what would happen if a broker is down when we try to connect to it as well. Thanks. Jiangjie (Becket) Qin On 5/19/15, 11:38 AM, Jay Kreps jay.kr...@gmail.com wrote: Here is the concern I had with reusing the metadata.fetch.timeout.ms: Previously people were using this as a bound on the time send() would block. It isn't a bound on the time we will wait on a metadata request, just the time the send call will block if metadata is missing for the topic. We told people who wanted a guarantee of no blocking to basically preinitialize metadata and set this timeout to 0. However I think now this will have a slightly different side effect which is to kill any request immediately for a leaderless partition even though that request is safely buffered in the record accumulator and no blocking will occur. People using the setting in the original way would now get a bit of a surprise. This may actually be okay and there is always a tradeoff between simplicity and control. -Jay On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I think this looks good. What I think is missing is an overview of the timeouts from the user's perspective. My worry is that it is quite complicated to reason about the current set of timeouts. Currently we have timeout.ms metadata.fetch.timeout.ms The proposed settings I think are: batch.expiration.ms request.timeout.ms replication.timeout.ms I think maybe we can skip the batch.expiration.ms. Instead maybe we can somehow combine these into a single request timeout so that we subtract the time you spent waiting from the request timeout and/or replication timeout somehow? I don't have an explicit proposal but my suspicion is that from the user's point of view there is just one timeout related to the request after which they don't care, and we can split that up between the batch time and the request time. Thoughts? How are we handling connection timeouts? If a machine hard fails in the middle of connection establishment there will be no outstanding requests. I think this may be okay because connections are established when we want to send a request and presumably we will begin the timer then? To that end I suggest we do two things: 1. Include KAKFA-1788. I know that technically these two things are different but from the user's point of view they aren't. 2. Include in the KIP the explanation to the user of the full set of timeouts, what they mean, how we will default them, and when to override which. I know this is a hassle but I think the end experience will be a lot better if we go through this thought process. -Jay On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I modified the WIKI page to incorporate the feedbacks from mailing list and KIP hangout. - Added the deprecation plan for TIMEOUT_CONFIG - Added the actions to take after request timeout I finally chose to create a new connection if requests timeout. The reason is: 1. In most cases, if a broker is just slow, as long as we set request timeout to be a reasonable value, we should not see many new connections get created. 2. If a broker is down, hopefully metadata refresh will find the new broker and we will not try to reconnect to the broker anymore. Comments are welcome! Thanks. Jiangjie (Becket) Qin On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: +1 Becket. That would give enough time for clients to move. We should make this change very clear. Thanks, Mayuresh On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid wrote:
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hi Jiangjie, So when you say : It is probably better to fail the messages in accumulator than keeping them in that case because I really cannot think of any case where metadata of a partition can disappear and come up again shortly, This is true * unless there is a metadata refresh that occurs during that interval right?* Thanks, Mayuresh On Tue, May 19, 2015 at 12:26 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Jay, I think that is a very reasonable concern. So the current behavior for those users are: 1. Send() will go through as long as metadata is available. 2. Send() will throw exception if the metadata of a partition is lost after pre-initialization. 3. The messages in accumulator will not be failed but wait until the partition metadata is available again. If we reuse metadata timeout, (1) and (2) are kept the same. Only (3) is changed as those messages will be failed immediately when the batch is ready. It is probably not an issue though, because user will get exception from send() call anyway in this case. It is probably better to fail the messages in accumulator than keeping them in that case because I really cannot think of any case where metadata of a partition can disappear and come up again shortly. So I guess the metadata timeout does exactly what it means - how long you are willing to wait for metadata. It is not designed to provide a blocking boundary for send() - we have blocking on buffer full as well. It is just one of the dependencies in send() so send() could be blocked for metadata timeout. I totally agree we should explain all the timeouts clearly. I think we are fine as long as we make sure the configuration is used for what it sounds to be used and articulate the impacts of those configurations. I’ll check what would happen if a broker is down when we try to connect to it as well. Thanks. Jiangjie (Becket) Qin On 5/19/15, 11:38 AM, Jay Kreps jay.kr...@gmail.com wrote: Here is the concern I had with reusing the metadata.fetch.timeout.ms: Previously people were using this as a bound on the time send() would block. It isn't a bound on the time we will wait on a metadata request, just the time the send call will block if metadata is missing for the topic. We told people who wanted a guarantee of no blocking to basically preinitialize metadata and set this timeout to 0. However I think now this will have a slightly different side effect which is to kill any request immediately for a leaderless partition even though that request is safely buffered in the record accumulator and no blocking will occur. People using the setting in the original way would now get a bit of a surprise. This may actually be okay and there is always a tradeoff between simplicity and control. -Jay On Tue, May 19, 2015 at 11:12 AM, Jay Kreps jay.kr...@gmail.com wrote: I think this looks good. What I think is missing is an overview of the timeouts from the user's perspective. My worry is that it is quite complicated to reason about the current set of timeouts. Currently we have timeout.ms metadata.fetch.timeout.ms The proposed settings I think are: batch.expiration.ms request.timeout.ms replication.timeout.ms I think maybe we can skip the batch.expiration.ms. Instead maybe we can somehow combine these into a single request timeout so that we subtract the time you spent waiting from the request timeout and/or replication timeout somehow? I don't have an explicit proposal but my suspicion is that from the user's point of view there is just one timeout related to the request after which they don't care, and we can split that up between the batch time and the request time. Thoughts? How are we handling connection timeouts? If a machine hard fails in the middle of connection establishment there will be no outstanding requests. I think this may be okay because connections are established when we want to send a request and presumably we will begin the timer then? To that end I suggest we do two things: 1. Include KAKFA-1788. I know that technically these two things are different but from the user's point of view they aren't. 2. Include in the KIP the explanation to the user of the full set of timeouts, what they mean, how we will default them, and when to override which. I know this is a hassle but I think the end experience will be a lot better if we go through this thought process. -Jay On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I modified the WIKI page to incorporate the feedbacks from mailing list and KIP hangout. - Added the deprecation plan for TIMEOUT_CONFIG - Added the actions to take after request timeout I finally chose to create a new connection if requests timeout. The reason is: 1. In most cases, if a broker is just slow, as long as we set request timeout to be a reasonable value, we
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
+1 on creating a new connection. So from what I understand the request timeout should be greater than the replication timeout in any case. If the broker is slow or not responding and the request times out we will treat it as we treat disconnections and update metadata try sending it to new leader or the same broker on a new connection. Thanks, Mayuresh On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I modified the WIKI page to incorporate the feedbacks from mailing list and KIP hangout. - Added the deprecation plan for TIMEOUT_CONFIG - Added the actions to take after request timeout I finally chose to create a new connection if requests timeout. The reason is: 1. In most cases, if a broker is just slow, as long as we set request timeout to be a reasonable value, we should not see many new connections get created. 2. If a broker is down, hopefully metadata refresh will find the new broker and we will not try to reconnect to the broker anymore. Comments are welcome! Thanks. Jiangjie (Becket) Qin On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: +1 Becket. That would give enough time for clients to move. We should make this change very clear. Thanks, Mayuresh On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Ewen, Very good summary about the compatibility. What you proposed makes sense. So basically we can do the following: In next release, i.e. 0.8.3: 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”) 2. Mark TIMEOUT_CONFIG as deprecated 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is defined and give a warning about deprecation. In the release after 0.8.3, we remove TIMEOUT_CONFIG. This should give enough buffer for this change. Request timeout is a complete new thing we add to fix a bug, I’m with you it does not make sense to have it maintain the old buggy behavior. So we can set it to a reasonable value instead of infinite. Jiangjie (Becket) Qin On 5/12/15, 12:03 PM, Ewen Cheslack-Postava e...@confluent.io wrote: I think my confusion is coming from this: So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). There are 3 possible compatibility issues I see here: * I assumed this meant the constants also change, so timeout.ms becomes replication.timeout.ms. This breaks config files that worked on the previous version and the only warning would be in release notes. We do warn about unused configs so they might notice the problem. * Binary and source compatibility if someone configures their client in code and uses the TIMEOUT_CONFIG variable. Renaming it will cause existing jars to break if you try to run against an updated client (which seems not very significant since I doubt people upgrade these without recompiling but maybe I'm wrong about that). And it breaks builds without have deprecated that field first, which again, is probably not the biggest issue but is annoying for users and when we accidentally changed the API we received a complaint about breaking builds. * Behavior compatibility as Jay mentioned on the call -- setting the config (even if the name changed) doesn't have the same effect it used to. One solution, which admittedly is more painful to implement and maintain, would be to maintain the timeout.ms config, have it override the others if it is specified (including an infinite request timeout I guess?), and if it isn't specified, we can just use the new config variables. Given a real deprecation schedule, users would have better warning of changes and a window to make the changes. I actually think it might not be necessary to maintain the old behavior precisely, although maybe for some code it is an issue if they start seeing timeout exceptions that they wouldn't have seen before? -Ewen On Wed, May 6, 2015 at 6:06 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Yes, I think using metadata timeout to expire batches in the record accumulator makes sense. Thanks, Jun On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed up on this KIP because currently we experience mirror-maker hung very likely when a broker is down. I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata timeout to expire the batches which are sitting in accumulator without leader info. I did that because the situation there is essentially missing metadata. As a summary of what I am thinking about the timeout in new Producer: 1. Metadata timeout: -
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
I modified the WIKI page to incorporate the feedbacks from mailing list and KIP hangout. - Added the deprecation plan for TIMEOUT_CONFIG - Added the actions to take after request timeout I finally chose to create a new connection if requests timeout. The reason is: 1. In most cases, if a broker is just slow, as long as we set request timeout to be a reasonable value, we should not see many new connections get created. 2. If a broker is down, hopefully metadata refresh will find the new broker and we will not try to reconnect to the broker anymore. Comments are welcome! Thanks. Jiangjie (Becket) Qin On 5/12/15, 2:59 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: +1 Becket. That would give enough time for clients to move. We should make this change very clear. Thanks, Mayuresh On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Ewen, Very good summary about the compatibility. What you proposed makes sense. So basically we can do the following: In next release, i.e. 0.8.3: 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”) 2. Mark TIMEOUT_CONFIG as deprecated 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is defined and give a warning about deprecation. In the release after 0.8.3, we remove TIMEOUT_CONFIG. This should give enough buffer for this change. Request timeout is a complete new thing we add to fix a bug, I’m with you it does not make sense to have it maintain the old buggy behavior. So we can set it to a reasonable value instead of infinite. Jiangjie (Becket) Qin On 5/12/15, 12:03 PM, Ewen Cheslack-Postava e...@confluent.io wrote: I think my confusion is coming from this: So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). There are 3 possible compatibility issues I see here: * I assumed this meant the constants also change, so timeout.ms becomes replication.timeout.ms. This breaks config files that worked on the previous version and the only warning would be in release notes. We do warn about unused configs so they might notice the problem. * Binary and source compatibility if someone configures their client in code and uses the TIMEOUT_CONFIG variable. Renaming it will cause existing jars to break if you try to run against an updated client (which seems not very significant since I doubt people upgrade these without recompiling but maybe I'm wrong about that). And it breaks builds without have deprecated that field first, which again, is probably not the biggest issue but is annoying for users and when we accidentally changed the API we received a complaint about breaking builds. * Behavior compatibility as Jay mentioned on the call -- setting the config (even if the name changed) doesn't have the same effect it used to. One solution, which admittedly is more painful to implement and maintain, would be to maintain the timeout.ms config, have it override the others if it is specified (including an infinite request timeout I guess?), and if it isn't specified, we can just use the new config variables. Given a real deprecation schedule, users would have better warning of changes and a window to make the changes. I actually think it might not be necessary to maintain the old behavior precisely, although maybe for some code it is an issue if they start seeing timeout exceptions that they wouldn't have seen before? -Ewen On Wed, May 6, 2015 at 6:06 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Yes, I think using metadata timeout to expire batches in the record accumulator makes sense. Thanks, Jun On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed up on this KIP because currently we experience mirror-maker hung very likely when a broker is down. I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata timeout to expire the batches which are sitting in accumulator without leader info. I did that because the situation there is essentially missing metadata. As a summary of what I am thinking about the timeout in new Producer: 1. Metadata timeout: - used in send(), blocking - used in accumulator to expire batches with timeout exception. 2. Linger.ms - Used in accumulator to ready the batch for drain 3. Request timeout - Used in NetworkClient to expire a batch and retry if no response is received for a request before timeout. So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). Would like to see what people think of above
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
I think my confusion is coming from this: So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). There are 3 possible compatibility issues I see here: * I assumed this meant the constants also change, so timeout.ms becomes replication.timeout.ms. This breaks config files that worked on the previous version and the only warning would be in release notes. We do warn about unused configs so they might notice the problem. * Binary and source compatibility if someone configures their client in code and uses the TIMEOUT_CONFIG variable. Renaming it will cause existing jars to break if you try to run against an updated client (which seems not very significant since I doubt people upgrade these without recompiling but maybe I'm wrong about that). And it breaks builds without have deprecated that field first, which again, is probably not the biggest issue but is annoying for users and when we accidentally changed the API we received a complaint about breaking builds. * Behavior compatibility as Jay mentioned on the call -- setting the config (even if the name changed) doesn't have the same effect it used to. One solution, which admittedly is more painful to implement and maintain, would be to maintain the timeout.ms config, have it override the others if it is specified (including an infinite request timeout I guess?), and if it isn't specified, we can just use the new config variables. Given a real deprecation schedule, users would have better warning of changes and a window to make the changes. I actually think it might not be necessary to maintain the old behavior precisely, although maybe for some code it is an issue if they start seeing timeout exceptions that they wouldn't have seen before? -Ewen On Wed, May 6, 2015 at 6:06 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Yes, I think using metadata timeout to expire batches in the record accumulator makes sense. Thanks, Jun On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed up on this KIP because currently we experience mirror-maker hung very likely when a broker is down. I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata timeout to expire the batches which are sitting in accumulator without leader info. I did that because the situation there is essentially missing metadata. As a summary of what I am thinking about the timeout in new Producer: 1. Metadata timeout: - used in send(), blocking - used in accumulator to expire batches with timeout exception. 2. Linger.ms - Used in accumulator to ready the batch for drain 3. Request timeout - Used in NetworkClient to expire a batch and retry if no response is received for a request before timeout. So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). Would like to see what people think of above approach? Jiangjie (Becket) Qin On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote: Jun, I thought a little bit differently on this. Intuitively, I am thinking that if a partition is offline, the metadata for that partition should be considered not ready because we don’t know which broker we should send the message to. So those sends need to be blocked on metadata timeout. Another thing I’m wondering is in which scenario an offline partition will become online again in a short period of time and how likely it will occur. My understanding is that the batch timeout for batches sitting in accumulator should be larger than linger.ms but should not be too long (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer with batches to be aborted. That said, I do agree it is reasonable to buffer the message for some time so messages to other partitions can still get sent. But adding another expiration in addition to linger.ms - which is essentially a timeout - sounds a little bit confusing. Maybe we can do this, let the batch sit in accumulator up to linger.ms, then fail it if necessary. What do you think? Thanks, Jiangjie (Becket) Qin On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Allowing messages to be accumulated in an offline partition could be useful since the partition may become available before the request timeout or linger time is reached. Now that we are planning to add a new timeout, it would be useful to think through whether/how that applies to messages in the accumulator too. Thanks, Jun On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote:
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
+1 Becket. That would give enough time for clients to move. We should make this change very clear. Thanks, Mayuresh On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Ewen, Very good summary about the compatibility. What you proposed makes sense. So basically we can do the following: In next release, i.e. 0.8.3: 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”) 2. Mark TIMEOUT_CONFIG as deprecated 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is defined and give a warning about deprecation. In the release after 0.8.3, we remove TIMEOUT_CONFIG. This should give enough buffer for this change. Request timeout is a complete new thing we add to fix a bug, I’m with you it does not make sense to have it maintain the old buggy behavior. So we can set it to a reasonable value instead of infinite. Jiangjie (Becket) Qin On 5/12/15, 12:03 PM, Ewen Cheslack-Postava e...@confluent.io wrote: I think my confusion is coming from this: So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). There are 3 possible compatibility issues I see here: * I assumed this meant the constants also change, so timeout.ms becomes replication.timeout.ms. This breaks config files that worked on the previous version and the only warning would be in release notes. We do warn about unused configs so they might notice the problem. * Binary and source compatibility if someone configures their client in code and uses the TIMEOUT_CONFIG variable. Renaming it will cause existing jars to break if you try to run against an updated client (which seems not very significant since I doubt people upgrade these without recompiling but maybe I'm wrong about that). And it breaks builds without have deprecated that field first, which again, is probably not the biggest issue but is annoying for users and when we accidentally changed the API we received a complaint about breaking builds. * Behavior compatibility as Jay mentioned on the call -- setting the config (even if the name changed) doesn't have the same effect it used to. One solution, which admittedly is more painful to implement and maintain, would be to maintain the timeout.ms config, have it override the others if it is specified (including an infinite request timeout I guess?), and if it isn't specified, we can just use the new config variables. Given a real deprecation schedule, users would have better warning of changes and a window to make the changes. I actually think it might not be necessary to maintain the old behavior precisely, although maybe for some code it is an issue if they start seeing timeout exceptions that they wouldn't have seen before? -Ewen On Wed, May 6, 2015 at 6:06 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Yes, I think using metadata timeout to expire batches in the record accumulator makes sense. Thanks, Jun On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed up on this KIP because currently we experience mirror-maker hung very likely when a broker is down. I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata timeout to expire the batches which are sitting in accumulator without leader info. I did that because the situation there is essentially missing metadata. As a summary of what I am thinking about the timeout in new Producer: 1. Metadata timeout: - used in send(), blocking - used in accumulator to expire batches with timeout exception. 2. Linger.ms - Used in accumulator to ready the batch for drain 3. Request timeout - Used in NetworkClient to expire a batch and retry if no response is received for a request before timeout. So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). Would like to see what people think of above approach? Jiangjie (Becket) Qin On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote: Jun, I thought a little bit differently on this. Intuitively, I am thinking that if a partition is offline, the metadata for that partition should be considered not ready because we don’t know which broker we should send the message to. So those sends need to be blocked on metadata timeout. Another thing I’m wondering is in which scenario an offline partition will become online again in a short period of time and how likely it will occur. My understanding is that the batch timeout for batches sitting in accumulator should be larger than
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Yes, that is the plan. On 5/5/15, 8:23 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: Just a quick question, can we handle REQUEST TIMEOUT as disconnections and do a fresh MetaDataRequest and retry instead of failing the request? Thanks, Mayuresh On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed up on this KIP because currently we experience mirror-maker hung very likely when a broker is down. I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata timeout to expire the batches which are sitting in accumulator without leader info. I did that because the situation there is essentially missing metadata. As a summary of what I am thinking about the timeout in new Producer: 1. Metadata timeout: - used in send(), blocking - used in accumulator to expire batches with timeout exception. 2. Linger.ms - Used in accumulator to ready the batch for drain 3. Request timeout - Used in NetworkClient to expire a batch and retry if no response is received for a request before timeout. So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). Would like to see what people think of above approach? Jiangjie (Becket) Qin On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote: Jun, I thought a little bit differently on this. Intuitively, I am thinking that if a partition is offline, the metadata for that partition should be considered not ready because we don’t know which broker we should send the message to. So those sends need to be blocked on metadata timeout. Another thing I’m wondering is in which scenario an offline partition will become online again in a short period of time and how likely it will occur. My understanding is that the batch timeout for batches sitting in accumulator should be larger than linger.ms but should not be too long (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer with batches to be aborted. That said, I do agree it is reasonable to buffer the message for some time so messages to other partitions can still get sent. But adding another expiration in addition to linger.ms - which is essentially a timeout - sounds a little bit confusing. Maybe we can do this, let the batch sit in accumulator up to linger.ms, then fail it if necessary. What do you think? Thanks, Jiangjie (Becket) Qin On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Allowing messages to be accumulated in an offline partition could be useful since the partition may become available before the request timeout or linger time is reached. Now that we are planning to add a new timeout, it would be useful to think through whether/how that applies to messages in the accumulator too. Thanks, Jun On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Jiangjie, Yes, I think using metadata timeout to expire batches in the record accumulator makes sense. Thanks, Jun On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed up on this KIP because currently we experience mirror-maker hung very likely when a broker is down. I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata timeout to expire the batches which are sitting in accumulator without leader info. I did that because the situation there is essentially missing metadata. As a summary of what I am thinking about the timeout in new Producer: 1. Metadata timeout: - used in send(), blocking - used in accumulator to expire batches with timeout exception. 2. Linger.ms - Used in accumulator to ready the batch for drain 3. Request timeout - Used in NetworkClient to expire a batch and retry if no response is received for a request before timeout. So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). Would like to see what people think of above approach? Jiangjie (Becket) Qin On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote: Jun, I thought a little bit differently on this. Intuitively, I am thinking that if a partition is offline, the metadata for that partition should be considered not ready because we don’t know which broker we should send the message to. So those sends need to be blocked on metadata timeout. Another thing I’m wondering is in which scenario an offline partition will become online again in a short period of time and how likely it will occur. My understanding is that the batch timeout for batches sitting in accumulator should be larger than linger.ms but should not be too long (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer with batches to be aborted. That said, I do agree it is reasonable to buffer the message for some time so messages to other partitions can still get sent. But adding another expiration in addition to linger.ms - which is essentially a timeout - sounds a little bit confusing. Maybe we can do this, let the batch sit in accumulator up to linger.ms, then fail it if necessary. What do you think? Thanks, Jiangjie (Becket) Qin On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Allowing messages to be accumulated in an offline partition could be useful since the partition may become available before the request timeout or linger time is reached. Now that we are planning to add a new timeout, it would be useful to think through whether/how that applies to messages in the accumulator too. Thanks, Jun On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Just a quick question, can we handle REQUEST TIMEOUT as disconnections and do a fresh MetaDataRequest and retry instead of failing the request? Thanks, Mayuresh On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed up on this KIP because currently we experience mirror-maker hung very likely when a broker is down. I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata timeout to expire the batches which are sitting in accumulator without leader info. I did that because the situation there is essentially missing metadata. As a summary of what I am thinking about the timeout in new Producer: 1. Metadata timeout: - used in send(), blocking - used in accumulator to expire batches with timeout exception. 2. Linger.ms - Used in accumulator to ready the batch for drain 3. Request timeout - Used in NetworkClient to expire a batch and retry if no response is received for a request before timeout. So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). Would like to see what people think of above approach? Jiangjie (Becket) Qin On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote: Jun, I thought a little bit differently on this. Intuitively, I am thinking that if a partition is offline, the metadata for that partition should be considered not ready because we don’t know which broker we should send the message to. So those sends need to be blocked on metadata timeout. Another thing I’m wondering is in which scenario an offline partition will become online again in a short period of time and how likely it will occur. My understanding is that the batch timeout for batches sitting in accumulator should be larger than linger.ms but should not be too long (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer with batches to be aborted. That said, I do agree it is reasonable to buffer the message for some time so messages to other partitions can still get sent. But adding another expiration in addition to linger.ms - which is essentially a timeout - sounds a little bit confusing. Maybe we can do this, let the batch sit in accumulator up to linger.ms, then fail it if necessary. What do you think? Thanks, Jiangjie (Becket) Qin On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Allowing messages to be accumulated in an offline partition could be useful since the partition may become available before the request timeout or linger time is reached. Now that we are planning to add a new timeout, it would be useful to think through whether/how that applies to messages in the accumulator too. Thanks, Jun On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed up on this KIP because currently we experience mirror-maker hung very likely when a broker is down. I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata timeout to expire the batches which are sitting in accumulator without leader info. I did that because the situation there is essentially missing metadata. As a summary of what I am thinking about the timeout in new Producer: 1. Metadata timeout: - used in send(), blocking - used in accumulator to expire batches with timeout exception. 2. Linger.ms - Used in accumulator to ready the batch for drain 3. Request timeout - Used in NetworkClient to expire a batch and retry if no response is received for a request before timeout. So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). Would like to see what people think of above approach? Jiangjie (Becket) Qin On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote: Jun, I thought a little bit differently on this. Intuitively, I am thinking that if a partition is offline, the metadata for that partition should be considered not ready because we don’t know which broker we should send the message to. So those sends need to be blocked on metadata timeout. Another thing I’m wondering is in which scenario an offline partition will become online again in a short period of time and how likely it will occur. My understanding is that the batch timeout for batches sitting in accumulator should be larger than linger.ms but should not be too long (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer with batches to be aborted. That said, I do agree it is reasonable to buffer the message for some time so messages to other partitions can still get sent. But adding another expiration in addition to linger.ms - which is essentially a timeout - sounds a little bit confusing. Maybe we can do this, let the batch sit in accumulator up to linger.ms, then fail it if necessary. What do you think? Thanks, Jiangjie (Becket) Qin On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Allowing messages to be accumulated in an offline partition could be useful since the partition may become available before the request timeout or linger time is reached. Now that we are planning to add a new timeout, it would be useful to think through whether/how that applies to messages in the accumulator too. Thanks, Jun On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Jun, I thought a little bit differently on this. Intuitively, I am thinking that if a partition is offline, the metadata for that partition should be considered not ready because we don’t know which broker we should send the message to. So those sends need to be blocked on metadata timeout. Another thing I’m wondering is in which scenario an offline partition will become online again in a short period of time and how likely it will occur. My understanding is that the batch timeout for batches sitting in accumulator should be larger than linger.ms but should not be too long (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer with batches to be aborted. That said, I do agree it is reasonable to buffer the message for some time so messages to other partitions can still get sent. But adding another expiration in addition to linger.ms - which is essentially a timeout - sounds a little bit confusing. Maybe we can do this, let the batch sit in accumulator up to linger.ms, then fail it if necessary. What do you think? Thanks, Jiangjie (Becket) Qin On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Allowing messages to be accumulated in an offline partition could be useful since the partition may become available before the request timeout or linger time is reached. Now that we are planning to add a new timeout, it would be useful to think through whether/how that applies to messages in the accumulator too. Thanks, Jun On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline.
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Jiangjie, Allowing messages to be accumulated in an offline partition could be useful since the partition may become available before the request timeout or linger time is reached. Now that we are planning to add a new timeout, it would be useful to think through whether/how that applies to messages in the accumulator too. Thanks, Jun On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Agreed we also need to change in the code of Sender.java to indicate that it resembles REPLICATION_TIMEOUT and not the request Timeout. Thanks, Mayuresh On Thu, Apr 16, 2015 at 1:08 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Guozhang, By implicit timeout for close() and flush(), I meant that currently we don’t have a explicit timeout for close() or flush() when a broker is down, so they can take pretty long up to TCP timeout which is hours as you mentioned. With the client side request timeout, the waiting time would be sort of bounded by request timeout. And I agree we’d better change the TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG to avoid confusion. Thanks. Jiangjie (Becket) Qin On 4/15/15, 10:38 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even needs to be a user-facing setting, especially if we have to add other
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hi Guozhang, By implicit timeout for close() and flush(), I meant that currently we don’t have a explicit timeout for close() or flush() when a broker is down, so they can take pretty long up to TCP timeout which is hours as you mentioned. With the client side request timeout, the waiting time would be sort of bounded by request timeout. And I agree we’d better change the TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG to avoid confusion. Thanks. Jiangjie (Becket) Qin On 4/15/15, 10:38 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even needs to be a user-facing setting, especially if we have to add other timeouts anyway. For example, would a fixed, generous value that's still much shorter than a TCP timeout, say 15s, be good enough? If other timeouts would allow, for example, the clients to properly exit even if requests have not hit their timeout, then what's the benefit of being able to configure the request-level timeout? That is a very good point. We have three
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even needs to be a user-facing setting, especially if we have to add other timeouts anyway. For example, would a fixed, generous value that's still much shorter than a TCP timeout, say 15s, be good enough? If other timeouts would allow, for example, the clients to properly exit even if requests have not hit their timeout, then what's the benefit of being able to configure the request-level timeout? That is a very good point. We have three places that we might be able to enforce timeout for a message send: 1. Before append to accumulator - handled by metadata timeout on per message level. 2. Batch of messages inside accumulator - no timeout mechanism now. 3. Request of batches after messages leave the accumulator - we have a broker side timeout but no client side timeout for now. My current proposal only address (3) but not (2). Honestly I do not have a very clear idea about what should we do with (2) right now. But I am with you that we should not expose too many configurations to users. What I am thinking now to handle (2) is when user call send, if we know that a partition is offline, we should throw exception immediately instead of putting it into accumulator. This would protect further memory consumption. We might also want to fail all the batches in the dequeue once we found a partition is offline. That said, I feel timeout might not be quite applicable to (2). Do you have any suggestion on this? Right, I didn't actually mean to solve 2 here, but was trying to figure out if a solution to 2 would reduce what we needed to do to address 3. (And depending on how they are implemented, fixing 1 might also address 2). It sounds like you hit hang that I wasn't really expecting. This probably just means the KIP motivation needs to be a bit clearer about what type of situation this
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only updated when: 1. A write/read/connect to channel failed. 2. A Key is canceled However when a broker is down before it sends back the response, the client seems not be able to detect this failure. I did a simple test below: 1. Run a selector on one machine and an echo server on another machine. Connect a selector to an echo server 2. Send a message to echo server using selector, then let the selector poll() every 10 seconds. 3. After the sever received the message, unplug cable on the echo server. 4. After waiting for 45 min. The selector still did not detected the network failure. Lsof on selector machine shows that the TCP connection is still considered ESTABLISHED. I’m not sure in this case what should we expect from the java.nio.channels.Selector. According to the document, the selector does not verify the status of the associated channel. In my test case it looks even worse that OS did not think of the socket has been disconnected. Anyway. It seems adding the client side request timeout is necessary. I’ve updated the KIP page to clarify the problem we want to solve according to Ewen’s comments. Thanks. Jiangjie (Becket) Qin On 4/14/15, 3:38 PM, Ewen Cheslack-Postava e...@confluent.io wrote: On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even needs to be a user-facing setting, especially if we have to add other timeouts anyway. For example, would a fixed, generous value that's still much shorter than a TCP timeout, say 15s, be good enough? If other timeouts would allow, for example, the clients to properly exit even if requests have not hit their timeout, then what's the benefit of being able to configure the request-level timeout? That is a very good point. We have three places that we might be able to enforce timeout for a message send: 1. Before append to accumulator - handled by metadata timeout on per message level. 2. Batch of messages inside accumulator - no timeout mechanism now. 3. Request of batches after messages leave the accumulator - we have a broker side timeout but no client side timeout for now. My current proposal only address (3) but not (2). Honestly I do not have a very clear idea about what should we do with (2) right now. But I am with you that we should not
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even needs to be a user-facing setting, especially if we have to add other timeouts anyway. For example, would a fixed, generous value that's still much shorter than a TCP timeout, say 15s, be good enough? If other timeouts would allow, for example, the clients to properly exit even if requests have not hit their timeout, then what's the benefit of being able to configure the request-level timeout? I know we have a similar setting, max.in.flights.requests.per.connection, exposed publicly (which I just discovered is missing from the new producer configs documentation). But it looks like the new consumer is not exposing that option, using a fixed value instead. I think we should default to hiding these implementation values unless there's a strong case for a scenario that requires customization. In other words, since the only user-facing change was the addition of the setting, I'm wondering if we can avoid the KIP altogether by just choosing a good default value for the timeout. -Ewen On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi, I just created a KIP to add a request timeout to NetworkClient for new Kafka clients. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient Comments and suggestions are welcome! Thanks. Jiangjie (Becket) Qin -- Thanks, Ewen
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
I suppose adding this timeout will help. In cases were the broker is not completely down but stops responding to the produce request, tools like Mirror Makers will hang since they are waiting for responses. Adding this timeout enables it to fail the current request and retry with fresh metadata. Thanks, Mayuresh On Mon, Apr 13, 2015 at 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even needs to be a user-facing setting, especially if we have to add other timeouts anyway. For example, would a fixed, generous value that's still much shorter than a TCP timeout, say 15s, be good enough? If other timeouts would allow, for example, the clients to properly exit even if requests have not hit their timeout, then what's the benefit of being able to configure the request-level timeout? I know we have a similar setting, max.in.flights.requests.per.connection, exposed publicly (which I just discovered is missing from the new producer configs documentation). But it looks like the new consumer is not exposing that option, using a fixed value instead. I think we should default to hiding these implementation values unless there's a strong case for a scenario that requires customization. In other words, since the only user-facing change was the addition of the setting, I'm wondering if we can avoid the KIP altogether by just choosing a good default value for the timeout. -Ewen On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi, I just created a KIP to add a request timeout to NetworkClient for new Kafka clients. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient Comments and suggestions are welcome! Thanks. Jiangjie (Becket) Qin -- Thanks, Ewen -- -Regards, Mayuresh R. Gharat (862) 250-7125
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. Ok, that makes sense. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even needs to be a user-facing setting, especially if we have to add other timeouts anyway. For example, would a fixed, generous value that's still much shorter than a TCP timeout, say 15s, be good enough? If other timeouts would allow, for example, the clients to properly exit even if requests have not hit their timeout, then what's the benefit of being able to configure the request-level timeout? That is a very good point. We have three places that we might be able to enforce timeout for a message send: 1. Before append to accumulator - handled by metadata timeout on per message level. 2. Batch of messages inside accumulator - no timeout mechanism now. 3. Request of batches after messages leave the accumulator - we have a broker side timeout but no client side timeout for now. My current proposal only address (3) but not (2). Honestly I do not have a very clear idea about what should we do with (2) right now. But I am with you that we should not expose too many configurations to users. What I am thinking now to handle (2) is when user call send, if we know that a partition is offline, we should throw exception immediately instead of putting it into accumulator. This would protect further memory consumption. We might also want to fail all the batches in the dequeue once we found a partition is offline. That said, I feel timeout might not be quite applicable to (2). Do you have any suggestion on this? Right, I didn't actually mean to solve 2 here, but was trying to figure out if a solution to 2 would reduce what we needed to do to address 3. (And depending on how they are implemented, fixing 1 might also address 2). It sounds like you hit hang that I wasn't really expecting. This probably just means the KIP motivation needs to be a bit clearer about what type of situation this addresses. The cause of the hang may also be relevant -- if it was something like a deadlock then that's something that should just be fixed, but if it's something outside our control then a timeout makes a lot more sense. I know we have a similar setting, max.in.flights.requests.per.connection, exposed publicly (which I just discovered is missing from the new producer configs documentation). But it looks like the new consumer is not exposing that option, using a fixed value instead. I think we should default to hiding these implementation values unless there's a strong case for a scenario that requires customization. For producer, max.in.flight.requests.per.connection really matters. If people do not want to have reorder of messages, they have to use max.in.flight.requests.per.connection=1. On the other hand, if throughput is more of a concern, it could be set to higher. For the new consumer, I checked the value and I am not sure if the hard coded max.in.flight.requests.per.connection=100 is the right value. Without the response to the previous request, what offsets should be put into the next fetch request? It seems to me the value will be one natively regardless of the setting unless we are sending fetch request to different partitions, which does not look like the case. Anyway, it looks to be a separate issue orthogonal to the request timeout. In other words, since the only user-facing change was the addition of the setting, I'm wondering if we can avoid the KIP
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hi Ewen, thanks for the comments. Very good points! Please see replies inline. On 4/13/15, 11:19 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Jiangjie, Great start. I have a couple of comments. Under the motivation section, is it really true that the request will never be completed? Presumably if the broker goes down the connection will be severed, at worst by a TCP timeout, which should clean up the connection and any outstanding requests, right? I think the real reason we need a different timeout is that the default TCP timeouts are ridiculously long in this context. Yes, when broker is completely down the request should be cleared as you said. The case we encountered looks like the broker was just not responding but TCP connection was still alive though. My second question is about whether this is the right level to tackle the issue/what user-facing changes need to be made. A related problem came up in https://issues.apache.org/jira/browse/KAFKA-1788 where producer records get stuck indefinitely because there's no client-side timeout. This KIP wouldn't fix that problem or any problems caused by lack of connectivity since this would only apply to in flight requests, which by definition must have been sent on an active connection. I suspect both types of problems probably need to be addressed separately by introducing explicit timeouts. However, because the settings introduced here are very much about the internal implementations of the clients, I'm wondering if this even needs to be a user-facing setting, especially if we have to add other timeouts anyway. For example, would a fixed, generous value that's still much shorter than a TCP timeout, say 15s, be good enough? If other timeouts would allow, for example, the clients to properly exit even if requests have not hit their timeout, then what's the benefit of being able to configure the request-level timeout? That is a very good point. We have three places that we might be able to enforce timeout for a message send: 1. Before append to accumulator - handled by metadata timeout on per message level. 2. Batch of messages inside accumulator - no timeout mechanism now. 3. Request of batches after messages leave the accumulator - we have a broker side timeout but no client side timeout for now. My current proposal only address (3) but not (2). Honestly I do not have a very clear idea about what should we do with (2) right now. But I am with you that we should not expose too many configurations to users. What I am thinking now to handle (2) is when user call send, if we know that a partition is offline, we should throw exception immediately instead of putting it into accumulator. This would protect further memory consumption. We might also want to fail all the batches in the dequeue once we found a partition is offline. That said, I feel timeout might not be quite applicable to (2). Do you have any suggestion on this? I know we have a similar setting, max.in.flights.requests.per.connection, exposed publicly (which I just discovered is missing from the new producer configs documentation). But it looks like the new consumer is not exposing that option, using a fixed value instead. I think we should default to hiding these implementation values unless there's a strong case for a scenario that requires customization. For producer, max.in.flight.requests.per.connection really matters. If people do not want to have reorder of messages, they have to use max.in.flight.requests.per.connection=1. On the other hand, if throughput is more of a concern, it could be set to higher. For the new consumer, I checked the value and I am not sure if the hard coded max.in.flight.requests.per.connection=100 is the right value. Without the response to the previous request, what offsets should be put into the next fetch request? It seems to me the value will be one natively regardless of the setting unless we are sending fetch request to different partitions, which does not look like the case. Anyway, it looks to be a separate issue orthogonal to the request timeout. In other words, since the only user-facing change was the addition of the setting, I'm wondering if we can avoid the KIP altogether by just choosing a good default value for the timeout. The problem is that we have a server side request timeout exposed as a public configuration. We cannot set the client timeout smaller than that value, so a hard coded value probably won¹t work here. -Ewen On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi, I just created a KIP to add a request timeout to NetworkClient for new Kafka clients. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+ timeout+to+NetworkClient Comments and suggestions are welcome! Thanks. Jiangjie (Becket) Qin -- Thanks, Ewen
[DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hi, I just created a KIP to add a request timeout to NetworkClient for new Kafka clients. https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient Comments and suggestions are welcome! Thanks. Jiangjie (Becket) Qin