[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15483605#comment-15483605 ] Bence Varga commented on KAFKA-1835: I can see two actions that can resolve this issue: A) Remove networking from the foreground thread. Neither the _constructor_ nor the call to send() should block. And definitely not when using default configuration. B) Remove any reference to "async" and "non-blocking" behaviour form the API docs as it is very misleading. Note: setting "metadata.fetch.timeout.ms" to zero will render the client unworkable. > Kafka new producer needs options to make blocking behavior explicit > --- > > Key: KAFKA-1835 > URL: https://issues.apache.org/jira/browse/KAFKA-1835 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 0.8.2.0, 0.9.0.0, 0.10.1.0 >Reporter: Paul Pearcy > Fix For: 0.9.0.0 > > Attachments: KAFKA-1835-New-producer--blocking_v0.patch, > KAFKA-1835.patch > > Original Estimate: 504h > Remaining Estimate: 504h > > The new (0.8.2 standalone) producer will block the first time it attempts to > retrieve metadata for a topic. This is not the desired behavior in some use > cases where async non-blocking guarantees are required and message loss is > acceptable in known cases. Also, most developers will assume an API that > returns a future is safe to call in a critical request path. > Discussing on the mailing list, the most viable option is to have the > following settings: > pre.initialize.topics=x,y,z > pre.initialize.timeout=x > > This moves potential blocking to the init of the producer and outside of some > random request. The potential will still exist for blocking in a corner case > where connectivity with Kafka is lost and a topic not included in pre-init > has a message sent for the first time. > There is the question of what to do when initialization fails. There are a > couple of options that I'd like available: > - Fail creation of the client > - Fail all sends until the meta is available > Open to input on how the above option should be expressed. > It is also worth noting more nuanced solutions exist that could work without > the extra settings, they just end up having extra complications and at the > end of the day not adding much value. For instance, the producer could accept > and queue messages(note: more complicated than I am making it sound due to > storing all accepted messages in pre-partitioned compact binary form), but > you're still going to be forced to choose to either start blocking or > dropping messages at some point. > I have some test cases I am going to port over to the Kafka producer > integration ones and start from there. My current impl is in scala, but > porting to Java shouldn't be a big deal (was using a promise to track init > status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14662476#comment-14662476 ] Mayuresh Gharat commented on KAFKA-1835: [~stevenz3wu] KAFKA-2120 has max.block.ms which will be used in metadataTimeOut. the waitOnMetadata() in send() will wait for max.block.ms after which it should throw a TimeoutException . Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14662607#comment-14662607 ] Steven Zhen Wu commented on KAFKA-1835: --- [~mgharat] thanks for the clarification. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14662427#comment-14662427 ] Steven Zhen Wu commented on KAFKA-1835: --- [~junrao] I think this is a different issue. here send() call can block on metadata wait, which is not a request timeout. it will wait as long as metadata for the topic is not available. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14659965#comment-14659965 ] Stefan Miklosovic commented on KAFKA-1835: -- Any progres? It is weird there is no action after such a massive information exchange. Are you stuck somewhere? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14630874#comment-14630874 ] Guozhang Wang commented on KAFKA-1835: -- Hmm, so although partitionsFor() returns you a future, if max.block.ms is set to 0 the consequent future.isDone will always return true? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14631509#comment-14631509 ] Jiangjie Qin commented on KAFKA-1835: - I am thinking partitionsFor() will not use max.block.ms. It just request for metadata update then return the future. The future will be populated by sender thread after metadata get refreshed. So if user wants a blocking behavior, they can just call future.get(), otherwise they can call future.isDone(). Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14629780#comment-14629780 ] Ismael Juma commented on KAFKA-1835: Another advantage of Future-based APIs when compared to exception-based ones is that they compose better. And it's easy to build generic combinators that aid in simplifying user code (e.g. CompletableFuture in Java 8, Future in Scala and to a lesser extent Guava's Futures/ListenableFuture). Sadly Java's Future is a bit anemic in this regard. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14629874#comment-14629874 ] David Hay commented on KAFKA-1835: -- If I understand correctly, the proposal in KIP-19 is to add some additional timeout parameters around sending messages (e.g. the amount of time a message can sit in the accumulator waiting for a broker to send to) as well as the max time to block waiting for things like broker metadata, etc. While this is closer, I would still feel obligated to wrap the send() request with my own threading, as I want to give the send request the best opportunity to succeed but not delay the users's request. That is, I wouldn't want to set the max.block.ms to 0, as I'd want to give the producer the opportunity to get the broker metadata, if needed, and not fail right away. I just want this request to run in a separate thread and then propagate the TimeoutException to the Callback provided to the send() method. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14629988#comment-14629988 ] Mayuresh Gharat commented on KAFKA-1835: I had a concern here, what happens if the queue that you are putting the send request into gets full? Do we fail those or block on those? For example, If you go on queuing the send request since the metadata is not available and the queue gets full (I assume that we will have some restrictions on the queue size). Are we guaranteeing that we will get the metadata successfully before the queue gets full? Thanks, Mayuresh Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14629014#comment-14629014 ] Jiangjie Qin commented on KAFKA-1835: - Oh, I see your concern. My understanding is that if partitionsFor() returns a future it will be completely non-blocking. All it does is to check metadata local cache and set the update metadata flag if needed, then return the future that will be updated by sender thread later. Is there something that you see might cause blocking? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628413#comment-14628413 ] Guozhang Wang commented on KAFKA-1835: -- I agree that either way there will be an API change. Given that, I think the question is how could we make it a bit easier and perhaps more intuitive for users to code. Personally I prefer exception over future since I feel it is actually less clear of a function definition that partitionsFor could also be possibly blocked for up to a configured timeout value, yet it returns you a future, plus the blocking behavior is not controllable dynamically at runtime since it is defined by configs, right? For [~becket_qin]'s follow-up case 3) example, in the exception approach it will become : {code} try { send(..); // send to any partition of the topic } catch (TopicUnknownException tue){ // handle exception } catch (BufferExhaustedException bee) { // handle exception.. } catch (//other exceptions) { .. } {code} And if the user really wants to send to a specific partition, I think they need to expect blocking somehow since in that case they really want to make sure the topic exists already, hence calling a blocking partitionsFor() in order to figure out the partition id would be fine. For the same reason above, I would also prefer to NOT let partitionsFor's blocking behavior be bounded by max.block.ms as well, but be blocking-until-topic-available if it does not return the Future but the list of partitions directly. Regarding the preinit.topics config, I think it is completely fine to add it if users do want that, and per implementation it would be blocking partitionsFor() calls in the constructor, since it is less likely that users want to complete the initialization of the producer if the specified topics are not even available to send yet. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628452#comment-14628452 ] Jiangjie Qin commented on KAFKA-1835: - Hey Guozhang, from your proposal it is not clear to me that how would we address the opposite blocking requirements of partitionsFor() in use case (2) and use case (3). If partitonsFor() is a blocking call, does it mean if user simply want to get the partition info of a topic (e.g. verify if the topic exist or not), they have to block on partitonsFor()? It looks this will break the requirements of use case (3). Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628510#comment-14628510 ] Guozhang Wang commented on KAFKA-1835: -- Jiangjie, I was trying to argue that for those users they would just try send and catch exceptions, if it is TopicUnknown do the same thing as in your approach if partitionsFor() returns false. And I think it is OK to expose such exceptions since the topic / partition concepts are exposed to users as well. If users do not want to always get a TopicUnkown exception upon the first send, they'd have to be blocked until the topic is available, hence a blocking partitionsFor(). Make sense? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628621#comment-14628621 ] Jiangjie Qin commented on KAFKA-1835: - Guozhang, what you said makes sense and I also realized use case (3) is not a good example for a non-blocking partitionsFor(). What I want to say is that partitionsFor() itself as a stand alone function could be called by user even without sending data, in that case, user might also want a non-blocking behavior. For example consider the use case (4): User call partitonsFor() simply to update the topic information maintained by themselves. In this case, it seems a blocking partitionsFor() won't work for them. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628656#comment-14628656 ] David Hay commented on KAFKA-1835: -- With this example, we would still be required to wrap send() with our own threading code to make it completely asynchronous. Since send() returns a Future, there is an implicit assumption that it will never block for anything. In our case, we want the call to send() to be completely asynchronous so that we don't impact end user performance. We deal with the exception in a reactive manner, using the callback variant of send(). As long as send() has the potential to block, even if it's just the first send, we'll have to wrap the call with our own threading logic. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628960#comment-14628960 ] Guozhang Wang commented on KAFKA-1835: -- Hey [~dhay], I think the proposal in KIP-19 will address your problem, such that you can config your producers to not be potentially blocked ever; i.e. if the topic is not known, throw the exception immediately. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628983#comment-14628983 ] Guozhang Wang commented on KAFKA-1835: -- Hi Jiangjie, yeah that makes more sense to me. I think for this case we would have a separate non-blocking function that would immediately return null if the topic is not in the metadata yet. For example, we can make partitionsFor() non-blocking, and add the other two function ListTopics() / waitForTopic(String) blocking on one round-trip of metadata refresh / metadata refreshes until the topic is added. I feel providing such a pair with a completely non-blocking partitionsFor with a blocking ListTopics is better than combing them together into a function that potentially blocks but still returns the future? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14628122#comment-14628122 ] David Hay commented on KAFKA-1835: -- It seems to me that there are two aspects to sending a message: (a) selecting a partition and (b) sending the message to the partition leader. Currently, (b) is performed asynchronously, (a) is not, and contains a potentially blocking call to get partition metadata (or refresh it). Maybe I'm missing something, but why couldn't the producer just queue the ProducerRecord and have a separate thread take care of querying for broker metadata, serializing the key and message, selecting a partition and pushing the result onto the send queue? In other words, put the current implementation of {{send()}} in a separate thread? This is essentially what we've had to do in order to work around the possibility of {{send()}} blocking when we don't want it to. In addition, it puts the serialization process in a separate thread, which is important when we need to send a message and respond to user input as quickly as possible. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626122#comment-14626122 ] Stefan Miklosovic commented on KAFKA-1835: -- Awesome discussion guys! I love to have it so clear and low level. I didn't know quite clearly what was happening anyway ... As I am sending a message to send to Kafka via new Producer from another place, in case I wrap producer to try-catch and act accordingly if producer fails to fetch the metadata, I can send the response back to the original caller that his request to send data to topic was unsuccessful. However, as I understand that correctly, the fact that producer failed to receive information about the topic when it was sending something from the first time fires new metadata request and any subsequent producer requests should be ok. I do not mind to have initial blocking behaviour at all. However I am not quite sure why it does timeout in the first place. In case topics are up and running and communication from producer to broker is ok, I do not see any reason why it should timeout after 60 secs anyway ... Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14626627#comment-14626627 ] Guozhang Wang commented on KAFKA-1835: -- [~becket_qin][~smiklosovic][~ewencp] I think a related issue that have also been discussed is KAFKA-1788 / KAFKA-2142: even if a partition is available when we do the checking at send() and hence append it to the buffer, it could still become unavailable (even forever) when the sender thread try to drain the buffer and send to the brokers. So thinking it a bit more I feel enforcing the producer to only enqueue records when the partition is known cannot really provide any strict guarantees, and we can probably skip the checking and instead use the timeout value to drop buffered messages and trigger callbacks with error as proposed in KAFKA-2142. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627184#comment-14627184 ] Jiangjie Qin commented on KAFKA-1835: - We discussed this in today's KIP hangout. Just summarize the discussion. In producer, we have the following methods that can potentially block: send() partitionsFor() According to KIP-19, all the methods will be using max.block.ms to bound the blocking time. But this will cause user who are setting max.block.ms=0 to receive exception on the first call on those methods because there was no metadata available when the methods are invoked for the first time. While there is a workaround for user to handle this, it is not an ideal API. For the improvement, there are several following use cases to consider: 1. User can tolerate blocking. 2. User don't want to block on send(), but can accept blocking on partitionsFor(). 3. User don't want to block on either send() or partitionsFor() Discussed within LinkedIn, we found that letting partitionsFor() to return a future might be good solution to address all the requirements. For (1), user can set max.block.ms to be non-zero and just do normal send. For (2), user can set max.block.ms=0, call partitonsFor().get() followed by send() For (3), user can set max.block.ms=0, call partitonsFor() followed by send() and handle all the exceptions on their own. So with this proposal, max.block.ms will be only used by send() Similar proposal applies to KAFKA-2275. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627279#comment-14627279 ] Guozhang Wang commented on KAFKA-1835: -- Thanks [~becket_qin] for the summary. Some follow-up comments regarding the new API: I think case (3) is only for users who are willing to drop messages if topic metadata is not present, and hence calling partitionsFor to get the partitions info for this topic, but if it is not present, I will just drop the messages and move forward. To cover this case the following pattern: {code} partitionsFor(topic); send(new Record(topic, key, value)) {code} is a little bit awkward to me, because since the user does not care if the topic metadata is really present and is willing to drop messages if not, it makes little sense to call partitionsFor but checks nothing on return values before send anyways. probably due to its function name. In addition, the behavior of partitionsFor(empty-list) as proposed in KAFKA-2275 is also not clear. Personally I feel it is more intuitive for the user's point of view to define the following: 1. max.block.ms only controls send() calls, which, depending on its value == 0 or not, throw exceptions immediately or wait on metadata refresh if the topic metadata is not available. 2. partitionsFor(single-topic) is blocking UNTIL the topic metadata is valid, this function will ever be called by users of case (1) and (2) only. * If we decide to let max.block.ms only controls send() calls, we may consider not making it a config but rather modify the API to send(record, timeout, callback). 3. for KAFKA-2275 on consumers, still add the listTopics(timeout) API, which is blocked for at most the timeout value. Consumer's partitionsFor() API will only ever be called by users who want to consume specific topics and also have customized partitions management and hence should tolerate blocking as well (i.e. you cannot really be unblocking at all if you want to consume specific topic-partitions because you do not know when these partitions will be available). Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627190#comment-14627190 ] Joel Koshy commented on KAFKA-1835: --- bq. Similar proposal applies to KAFKA-2275 https://issues.apache.org/jira/browse/KAFKA-2275?focusedCommentId=14627186page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14627186 Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627375#comment-14627375 ] Joel Koshy commented on KAFKA-1835: --- bq. I think case (3) is only for users who are willing to drop messages if topic metadata is not present I'm not sure I completely agree with the above - there are applications that care about having completely non-blocking APIs - as evidenced by plenty of emails on the list. However, that does not mean such applications are willing to drop messages if metadata is not present at the time of the call. This is the case for event-driven programming (think node-js style) where the application can go do something else in its event processing loop if a library call returns a future that isn't ready at that specific moment. bq. the behavior of partitionsFor(empty-list) as proposed in KAFKA-2275 is also not clear Agreed that it can be unclear. i.e., we should probably still have a listTopics API. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627395#comment-14627395 ] Jay Kreps commented on KAFKA-1835: -- Changing the return type of a method is a hard compatibility break, though, right? It will be impossible for code to work with both prior and future versions. From my point of view that kind of thing is a non-starter--imagine trying to role that out in a large org. [~jjkoshy] I agree with your use case, but I think [~guozhang] is right that it does seem more natural for people who want to guarantee non-blocking usage to set max.block.ms=0 and have send throw an exception even in that case, right? If you get the exception you would go back into your event loop and do your other stuff. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627263#comment-14627263 ] Mayuresh Gharat commented on KAFKA-1835: Either way user will have to block or handle exceptions. Returning a future as Becket said looks good. But does that mean we tell the user that he should not do a send() until the future has returned? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627400#comment-14627400 ] Guozhang Wang commented on KAFKA-1835: -- Joel, I think you are referring to some pattern like: {code} init() { Future.. future = partitionsFor(topic); } handle(req) { if (future.ready()) // send this message related to req and all other buffered ones else // buffer this event for sending } {code} for event-driven programming right? But I think in this case users would usually not bother checking if the topic metadata is available or not through partitionsFor since they are not in a hurry sending the messages anyways, but rather try-send-otherwise-do-sth like: {code} handle(req) { try { producer.send(req + previously-buffered messages) } catch { // not successful, either drop it or buffer it for next send call } } {code} I guess what I am trying to argue here is that for users who really want complete non-blocking operations it makes little difference for them to hold a Future object at hand and check periodically than just handle exceptions up-front. The same argument stands for buffer-exhausted as well: users usually do not prefer to check a variable indicating if the buffer is full or not every time they want to send something without blocking, but rather configure it to let producers throw exceptions directly and handle them. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627474#comment-14627474 ] Joel Koshy commented on KAFKA-1835: --- My earlier comment was more to clarify the reasoning. i.e., the statement that users that want non-blocking behavior are okay with losing messages which isn't necessarily true. I'm actually not opposed to the approach of going with max.block.ms == 0 for the non-blocking case and throwing an exception on metadata not being ready. I thought we even covered that option at the hangout (but Becket wasn't terribly excited about exposing any metadata-related exception in the first send to the user). From my p.o.v I don't think that's an issue since users cannot really escape from knowing about partitions and thus metadata. This (future API) was just an alternate approach that came out of a conversation with [~onurkaraman] in KAFKA-2275. We have a need for list-topics (for some use cases such as ETL). We could just go with a blocking API there or a config-driven (max.block.ms type) approach as we are considering currently in the producer. Now that we are working through the change in the consumer API to make the commit return a future we felt this could benefit from a similar pattern. And if we are considering it for the consumer it would be beneficial to have an identical API in the producer that also helps address the blocking concerns that people have raised. Personally I like the future-oriented API since (i) it makes it clear to the caller that some asynchronous handling is involved and the result can be collected from the future (ii) has clear semantics on blocking - the user can decide (at runtime) whether (or how much) blocking is acceptable. The other approach to go with max.block.ms==0 and throw an exception would also work but to be equally clear in the API, the API should throw a checked exception (which is again an API change). Yes it involves an API change - but I don't think we want to be completely closed to changes in the API of the new (okay one year old) producer if everyone agrees that a better and clearer API exists. I agree this is a pain and should be weighed carefully. The barrier for change is much less on the new consumer but I would be surprised if majority of users would be opposed to any API changes in the new producer even at this stage as long as it is in the right direction and older APIs are deprecated over time. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14627485#comment-14627485 ] Jiangjie Qin commented on KAFKA-1835: - My example for use case (3) is probably a bad one... However, partitionsFor() might also be called independently but not only for sending data, right? So if user wants a completely non-blocking API, letting partionsFor() to block would not work. I think the main benefit to have partitionsFor() to return a future is it can cover both use case (2) and (3) dynamically, which have opposite blocking requirements on partitonsFor(). Not sure if it is just my imagined use case, but let's say in use case (3), user also wants to pre-initialize the metadata but after that they don't want to block on any call. Then they can do something like this: {code} ... partitionsFor(foo).get(); try { send(); ... // some other logic Future.. partitionInfo = partitionsFor(); // if (partitionInfo.isDone()) { ... // do something with partition info. } ... } catch (TimeoutException te){ // handle exception } {code} So I guess returning a future gives user the max freedom. That said, throwing exception for non-blocking calls and letting user to handle it also sounds reasonable to me. Functionality wise, I don't think there is much difference between the above proposal and having an additional configuration of topic.metadata.preinit.list. The benefit of the configuration approach may be it is a smaller and backward compatible change. Do you see any issue on adding that configuration? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14624382#comment-14624382 ] Stefan Miklosovic commented on KAFKA-1835: -- [~becket_qin] Anything which would unblock me would be awesome. In process of the transition to newer Kafka releases (I am currently on 0.8.1.1 because of this), if this will not be resolved, I would have to stick with old Scala producer but I would rewrite consumers to use new API of 0.8.3. While this could work, I do not like this approach, I want to get new stuff both on producer and consumer side and get rid of Scala dependencies all over the project. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14625781#comment-14625781 ] Jiangjie Qin commented on KAFKA-1835: - [~ewencp] I agree that handling exception is something users have to do. But telling user they are guaranteed to receive exception for a valid configuration sounds a bit awkward to me. I think it would be better to only give exception to user when there is really something went wrong instead of asking user to handle false alarms. WRT the stale metadata, I agree with you we should let user know immediately if a metadata refresh failed (actually from this point of view, we should try to fetch metadata from bootstrap servers up on clients instantiation instead of doing it later because bootstrap servers might even not connectable), but we might want to be very careful on failing send if we can still send them. This looks more of a design decision rather than a bug to me. One argument is that we should let user know immediately if something goes wrong. On the other hand, we want to deliver the message if possible instead of simply dropping them on the floor. So maybe we can append the messages but throw an exception saying that metadata is outdated. Also, I think it might worth thinking what kind of exception we want to expose to user. For instance, if a partition of a topic is offline, should we throw exception in send() or should we just send messages to other available partitions. If user were sending keyed messages, the answer would be obvious, what if it is sending non-keyed messages? Thanks for the feedback [~stevenz3wu], I guess in your case you are producing messages to a changing topic set. In that case, it is necessary to deal with the exception during producing if matadata timeout is set to 0. But for people who are producing to a single fixed topic, supposedly metadata should not be lost after the first successful metadata fetch. If it is lost then that would be a big problem such as partition gets offline. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14625763#comment-14625763 ] Steven Zhen Wu commented on KAFKA-1835: --- [~ewencp] As a user, I don't mind handling the error if metadata not available or buffer is full. but fail fast and don't block my thread, because API is advertised as non-blocking/async. [~becket_qin] we did a work-around exactly as you described. our goal is never block caller thread. so if we have seen metadata for a topic, go to the fast lane for calling send directly. otherwise, we put the msg into a queue. then a background thread drain the queue and check partitionsFor(...) which can block. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14625837#comment-14625837 ] Ewen Cheslack-Postava commented on KAFKA-1835: -- [~becket_qin] Agreed that guaranteeing an error on first send is awkward. That's why I said that behavior would be perversely good behavior, only because it forces them to handle that type of error :) Then again, if you do something like start a metadata fetch upon instantiation, the time between instantiation and first send could be arbitrary, and often times might be extremely small. So even starting a fetch then may still result in the same error very commonly and wouldn't significantly change the behavior. Your response to the stale metadata question is interesting because the end result is enqueue, but notify of error. I think that is behavior that [~stevenz3wu] would probably also be happy with in the case of first send -- enqueue the data without partitioning, but notify of the error. Not saying that's the *right* solution, just that it's a solution that would be symmetric in both cases and satisfy the non-blocking constraint. The point about unkeyed messages is really interesting -- it's a good point that there's really no good reason to indefinitely delay those messages just because we chose their partitions arbitrarily and that partition happens to be offline. But I'm not sure tracking that subset of messages and separately re-partitioning them so they can get sent out is worth the overhead and complexity of tracking all that extra info. Then again, if your application is only sending unkeyed messages, it could be pretty beneficial to enable resending to other topics (and support a random partitioner that ignores known-unavailable partitions). In any case, this is a giant tangent (my bad...). Coming back to the original issue, I think with the proper explanation, the behavior of failing on the first send isn't that unintuitive. The short version is: * KafkaProducer will only queue records when it knows the partition (and therefore, indirectly, the broker) the data is destined for. When it starts, the producer has no information about any topics and therefore cannot enqueue any data. Initial requests to send records will fail, but trigger requests for this metadata, and after it is received all subsequent send() calls will succeed assuming there is enough queue space. The long version requires explaining that: * Figuring out which partition a message should be sent to requires some information about the topic (such as number of partitions). * By setting a 0 or very small max.block.ms, you have given us basically no time to look this information up. * Queuing records before we know what partition they are destined for adds an extra layer of queuing and complexity. * If you just created the producer, we've had little time to get the info we need. Therefore, to avoid an extra layer of queuing, you will see an error. If you are willing to accept a small *potential* delay, which might average XX ms for common configurations, you would not normally see this error. If you absolutely need to not block for XX ms, then you should handle this error. I think that in practice, this is probably a good compromise. People who *really* understand what's going on can get the behavior they want, but have to jump through a couple of extra hoops, including setting the right configs and handling errors that most users would be unlikely to see. The vast majority of users who don't care about blocking a bit just leave the default settings and never notice that the producer blocks on the first send unless they have a really long outage where they can't fetch metadata. In other words, while the completely non-blocking case isn't ideal, I think since it would require a very specific configuration change, it won't affect most users and so the somewhat odd behavior is acceptable given clear documentation. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14625234#comment-14625234 ] Jiangjie Qin commented on KAFKA-1835: - [~guozhang][~jkreps] What do you think on this? The scenario we want to solve is user don't want to send to be blocked in anyway. Assuming KIP-19 is done, user will set max.block.ms to be 0. In that case, the problem becomes how can the first send() get through. I am thinking maybe we can do a metadata refresh when the clients get instantiated. There might be some overhead because this will get the metadata of all the topic back, but I don't think this will be a big issue. Any thoughts? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14625568#comment-14625568 ] Guozhang Wang commented on KAFKA-1835: -- [~smiklosovic] You will only be blocked for the full timeout period if the topic your producer is trying to send to is not available, or the destination broker cluster is not available (which is a much bigger problem though), hence the producer client blocks on refreshing the metadata for that topic; if that topic already exists the producer should not block for that long. What is the scenario you have encountered? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14625634#comment-14625634 ] Jiangjie Qin commented on KAFKA-1835: - Hey [~guozhang], I am worrying about the case where user wants strict non-blocking during send. In that case they will set max.block.ms to be 0. Therefore they will always get an exception for the first send() because there is no metadata and user decided not to wait at all. Currently a workaround is to call partitionsFor on the topic first to enforce a metadata refresh then starting send. But in KIP-19 partitionsFor() will also use max.block.ms. So user will not be able to enforce a metadata refresh before sending data. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14625674#comment-14625674 ] Ewen Cheslack-Postava commented on KAFKA-1835: -- [~becket_qin] I agree that for a user this doesn't look great when they first start using the API and switch it to be fully non-blocking. Although in a perverse way it may be pretty good behavior for those users -- it forces them to actually handle that exception properly when it occurs because they need to handle it to get any data sent out. This means they should be robust, to some degree, to both the metadata fetch and a buffer full condition. And I'm not convinced this behavior is unreasonable. Right now we continue to use the metadata we have for partitioning even if it's hit the max age. I'd argue we could very reasonably say that as soon as it hits metadata.max.age.ms and we can't get an update, we could reasonably start throwing the same error on send because we can't be certain the partitioning is still valid since the number of partitions could have changed. In fact, perhaps that's another bug? Given connectivity issue with the cluster, the producer could incorrectly partition for arbitrarily long. It's also limited by the buffer size so in most cases probably wouldn't be an issue, but seems like bad behavior nonetheless. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14624238#comment-14624238 ] Jiangjie Qin commented on KAFKA-1835: - Maybe we can enforce a metadata refresh upon producer creation. The downside is that this will essentially get metadata of all the topics in a cluster. I am not sure how much overhead this would create but I guess it is likely OK. After KIP-19 is checked in user can set block timeout to be 0 and get non blocking send. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14623837#comment-14623837 ] Stefan Miklosovic commented on KAFKA-1835: -- I am still hitting this issue, I need to move to 0.8.3 because of new commiting features in consumer but I can not send any messages to topics because it timeout on this error. I am quite desparate, wrapping in a thread does not work at all ... Will this be ever released in 0.8.3 please? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14519105#comment-14519105 ] Stefan Miklosovic commented on KAFKA-1835: -- what I am basically able to achieve by wrapping that producer to callable is that it outputs this long from Kafka zookeeper [2015-04-29 12:26:47,774] INFO Got user-level KeeperException when processing sessionid:0x14d04b66e8e type:create cxid:0x24f zxid:0x25 txntype:-1 reqpath:n/a Error Path:/brokers/topics/mytopic/partitions/4 Error:KeeperErrorCode = NoNode for /brokers/topics/mytopic/partitions/4 (org.apache.zookeeper.server.PrepRequestProcessor) This repeats for every partition and every topic I previously created however it times out in the end anyway. Is there any configuration option I should set? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14519506#comment-14519506 ] David Hay commented on KAFKA-1835: -- Fixed some syntax errors in my code sample. This example isn't intended to prevent the timeout problems getting the broker metadata. Instead, it is intended to isolate the producer from the lag required to actually send a message to the Kafka brokers. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14518184#comment-14518184 ] David Hay commented on KAFKA-1835: -- [~smiklosovic] I've worked around the issue by creating my own thread pool and wrapping the producer send requests with my own Callable, as follows. This also uses Guava's ListenableFuture class: {code:language=java} ExecutorService executorService = Executors.newFixedThreadPool(5); FutureProducerRecord = Futures.dereference( executorService.submit(new CallableListenableFutureRecordMetadata() { public ListenableFutureRecordMetadata call() throws Exception { final SettableFutureRecordMetadata responseFuture = SettableFuture.create(); try { producer.send(new ProducerRecord(topic, key, message), new Callback() { public void onCompletion(RecordMetadata metadata, Exception ex) { if (exception == null) { responseFuture.set(metadata) } else { resposneFuture.setException(ex) } } } } catch (Exception ex) { responseFuture.setException(ex); } return responseFuture; } }); {code} Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14517627#comment-14517627 ] Stefan Miklosovic commented on KAFKA-1835: -- Is there actually any workaround? I am using new producer in vert.x verticle and it blocks and timeouts in 60 seconds because it can not fetch these metadata. What should I do? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505238#comment-14505238 ] David Hay commented on KAFKA-1835: -- This solution doesn't seem ideal to me. It requires an update to {{pre.initialize.topics}} every time we add a new topic to our system. Otherwise, if I publish to a topic that is not in the list, then the behavior is the same as now...blocking until the metadata is returned the first time. Ideally, as I mentioned in KAFKA-2137, the metadata refresh would happen in a background thread. Perhaps a better solution would be have the entire body of the {{send(ProducerRecord, Callback)}} method running in a separate thread (or thread pool)? Alternately, is there a way to submit the request to the Sender without (yet) knowing what partition we want to send to? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2.0, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.3 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14286967#comment-14286967 ] Ewen Cheslack-Postava commented on KAFKA-1835: -- [~ppearcy] these are easier to review if they're on reviewboard -- might help to use the patch submission tool in the future. Here are some notes on the current patch: KafkaProducer.java * No need to use the object forms of primitive types, change Boolean - boolean, Long - long, etc. * initialized should be an AtomicBoolean or volatile boolean since it's read/written from different threads * Error handling when waiting for the Future to finish seems wrong -- if there is an exception, we probably want to pass it along/throw another one to indicate the problem to the caller. Currently it just falls through and then only throws an exception when send() is called, so the error ends up disconnected from the source of the problem. It seems like it would be better to just handle the error immediately. * Similarly, I don't think send() should check initialized if preinitialization is handled in the constructor -- if failure to preinitialize also threw an exception, then it would be impossible to call send() unless preinitialization was complete. * If you follow the above approach, you can avoid making initialized a field in the class. It would only need to be a local variable since it would only be used in the constructor. * Do we even need the ExecutorService? Since the thread creating the producer is going to block by calling Future.get(), what does having the executor accomplish? * initializeProducer() doesn't need a return value since only ever returns true. ProducerConfig.java * Config has a getList() method and ConfigDef has a LIST type. Use those for pre.initialize.topics instead of parsing the list yourself. * I think the docstrings could be better, e.g.: pre.initialize.topics: List of topics to preload metadata for when creating the producer so subsequent calls to send are guaranteed not to block. If metadata for these topics cannot be loaded within codepre.initialize.timeout.ms/code milliseconds, the producer constructor will throw an exception. pre.initialize.timeout.ms: The producer blocks when sending the first message to a topic if metadata is not yet available for that topic. When this configuration is greater than 0, metadata for the topics specified by codepre.initialize.topics/code are prefetched during construction, throwing an exception after codepre.initialize.timeout.ms/code milliseconds if the metadata has not been populated. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.2 Attachments: KAFKA-1835-New-producer--blocking_v0.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14286816#comment-14286816 ] Paul Pearcy commented on KAFKA-1835: Do I need to do anything else to get this in the review pipeline? Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.2 Attachments: KAFKA-1835-New-producer--blocking_v0.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14287035#comment-14287035 ] Paul Pearcy commented on KAFKA-1835: Created reviewboard https://reviews.apache.org/r/30158/diff/ against branch origin/trunk Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.2 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1835) Kafka new producer needs options to make blocking behavior explicit
[ https://issues.apache.org/jira/browse/KAFKA-1835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14287042#comment-14287042 ] Paul Pearcy commented on KAFKA-1835: Thanks Ewan. I created a review, added your comments, and will follow up. Kafka new producer needs options to make blocking behavior explicit --- Key: KAFKA-1835 URL: https://issues.apache.org/jira/browse/KAFKA-1835 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.2, 0.8.3, 0.9.0 Reporter: Paul Pearcy Fix For: 0.8.2 Attachments: KAFKA-1835-New-producer--blocking_v0.patch, KAFKA-1835.patch Original Estimate: 504h Remaining Estimate: 504h The new (0.8.2 standalone) producer will block the first time it attempts to retrieve metadata for a topic. This is not the desired behavior in some use cases where async non-blocking guarantees are required and message loss is acceptable in known cases. Also, most developers will assume an API that returns a future is safe to call in a critical request path. Discussing on the mailing list, the most viable option is to have the following settings: pre.initialize.topics=x,y,z pre.initialize.timeout=x This moves potential blocking to the init of the producer and outside of some random request. The potential will still exist for blocking in a corner case where connectivity with Kafka is lost and a topic not included in pre-init has a message sent for the first time. There is the question of what to do when initialization fails. There are a couple of options that I'd like available: - Fail creation of the client - Fail all sends until the meta is available Open to input on how the above option should be expressed. It is also worth noting more nuanced solutions exist that could work without the extra settings, they just end up having extra complications and at the end of the day not adding much value. For instance, the producer could accept and queue messages(note: more complicated than I am making it sound due to storing all accepted messages in pre-partitioned compact binary form), but you're still going to be forced to choose to either start blocking or dropping messages at some point. I have some test cases I am going to port over to the Kafka producer integration ones and start from there. My current impl is in scala, but porting to Java shouldn't be a big deal (was using a promise to track init status, but will likely need to make that an atomic bool). -- This message was sent by Atlassian JIRA (v6.3.4#6332)