[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https
[ https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364205#comment-14364205 ] Anatoli Fomenko commented on KAFKA-2023: Works for me on MacOS but not on one CentOS 6.5. Perhaps these words would work better? If you see error: RPC failed; result=22, HTTP code = 405 use git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka git clone kafka repository requires https - Key: KAFKA-2023 URL: https://issues.apache.org/jira/browse/KAFKA-2023 Project: Kafka Issue Type: Bug Components: website Reporter: Anatoli Fomenko Priority: Minor Attachments: KAFKA-2023.patch From http://kafka.apache.org/code.html: Our code is kept in git. You can check it out like this: git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka On CentOS 6.5: {code} $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ error: RPC failed; result=22, HTTP code = 405 {code} while: {code} $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ remote: Counting objects: 24607, done. remote: Compressing objects: 100% (9212/9212), done. remote: Total 24607 (delta 14449), reused 19801 (delta 11465) Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done. Resolving deltas: 100% (14449/14449), done. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2024) Cleaner can generate unindexable log segments
Gian Merlino created KAFKA-2024: --- Summary: Cleaner can generate unindexable log segments Key: KAFKA-2024 URL: https://issues.apache.org/jira/browse/KAFKA-2024 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.0 Reporter: Gian Merlino It's possible for log cleaning to generate segments that have a gap of more than Int.MaxValue between their base offset and their last offset. It's not possible to index those segments since there's only 4 bytes available to store that difference. The broker will end up writing overflowed ints into the index, and doesn't detect that there is a problem until restarted, at which point you get one of these: 2015-03-16 20:35:49,632 FATAL [main] kafka.server.KafkaServerStartable - Fatal error during KafkaServerStartable startup. Prepare to shutdown java.lang.IllegalArgumentException: requirement failed: Corrupt index found, index file (/mnt/persistent/kafka-logs/topic/.index) has non-zero size but the last offset is -1634293959 and the base offset is 0 at scala.Predef$.require(Predef.scala:233) at kafka.log.OffsetIndex.sanityCheck(OffsetIndex.scala:352) at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:204) at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:203) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.log.Log.loadSegments(Log.scala:203) at kafka.log.Log.init(Log.scala:67) at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:142) at kafka.utils.Utils$$anon$1.run(Utils.scala:54) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https
[ https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364404#comment-14364404 ] Anatoli Fomenko commented on KAFKA-2023: Concur: the problem persists with git 1.7.1, and does not with later versions, such as 2.0.4. The question is if the site supports git 1.7.1 that is a default CentOS 6 version. git clone kafka repository requires https - Key: KAFKA-2023 URL: https://issues.apache.org/jira/browse/KAFKA-2023 Project: Kafka Issue Type: Bug Components: website Reporter: Anatoli Fomenko Priority: Minor Attachments: KAFKA-2023.patch From http://kafka.apache.org/code.html: Our code is kept in git. You can check it out like this: git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka On CentOS 6.5: {code} $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ error: RPC failed; result=22, HTTP code = 405 {code} while: {code} $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ remote: Counting objects: 24607, done. remote: Compressing objects: 100% (9212/9212), done. remote: Total 24607 (delta 14449), reused 19801 (delta 11465) Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done. Resolving deltas: 100% (14449/14449), done. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2023) git clone kafka repository requires https
[ https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein updated KAFKA-2023: - Reviewer: Gwen Shapira git clone kafka repository requires https - Key: KAFKA-2023 URL: https://issues.apache.org/jira/browse/KAFKA-2023 Project: Kafka Issue Type: Bug Components: website Reporter: Anatoli Fomenko Assignee: Anatoly Fayngelerin Priority: Minor Attachments: KAFKA-2023.patch From http://kafka.apache.org/code.html: Our code is kept in git. You can check it out like this: git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka On CentOS 6.5: {code} $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ error: RPC failed; result=22, HTTP code = 405 {code} while: {code} $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ remote: Counting objects: 24607, done. remote: Compressing objects: 100% (9212/9212), done. remote: Total 24607 (delta 14449), reused 19801 (delta 11465) Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done. Resolving deltas: 100% (14449/14449), done. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2023) git clone kafka repository requires https
[ https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joe Stein reassigned KAFKA-2023: Assignee: Anatoly Fayngelerin git clone kafka repository requires https - Key: KAFKA-2023 URL: https://issues.apache.org/jira/browse/KAFKA-2023 Project: Kafka Issue Type: Bug Components: website Reporter: Anatoli Fomenko Assignee: Anatoly Fayngelerin Priority: Minor Attachments: KAFKA-2023.patch From http://kafka.apache.org/code.html: Our code is kept in git. You can check it out like this: git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka On CentOS 6.5: {code} $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ error: RPC failed; result=22, HTTP code = 405 {code} while: {code} $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ remote: Counting objects: 24607, done. remote: Compressing objects: 100% (9212/9212), done. remote: Total 24607 (delta 14449), reused 19801 (delta 11465) Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done. Resolving deltas: 100% (14449/14449), done. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
101. There may be a use case where you only want the topics to be created manually by admins. Currently, you can do that by disabling auto topic creation and issue topic creation from the TopicCommand. If we disable auto topic creation completely on the broker and don't have a way to distinguish between topic creation requests from the regular clients and the admin, we can't support manual topic creation any more. I was thinking that another way of distinguishing the clients making the topic creation requests is using clientId. For example, the admin tool can set it to something like admin and the broker can treat that clientId specially. Also, there is a related discussion in KAFKA-2020. Currently, we do the following in TopicMetadataResponse: 1. If leader is not available, we set the partition level error code to LeaderNotAvailable. 2. If a non-leader replica is not available, we take that replica out of the assigned replica list and isr in the response. As an indication for doing that, we set the partition level error code to ReplicaNotAvailable. This has a few problems. First, ReplicaNotAvailable probably shouldn't be an error, at least for the normal producer/consumer clients that just want to find out the leader. Second, it can happen that both the leader and another replica are not available at the same time. There is no error code to indicate both. Third, even if a replica is not available, it's still useful to return its replica id since some clients (e.g. admin tool) may still make use of it. One way to address this issue is to always return the replica id for leader, assigned replicas, and isr regardless of whether the corresponding broker is live or not. Since we also return the list of live brokers, the client can figure out whether a leader or a replica is live or not and act accordingly. This way, we don't need to set the partition level error code when the leader or a replica is not available. This doesn't change the wire protocol, but does change the semantics. Since we are evolving the protocol of TopicMetadataRequest here, we can potentially piggyback the change. 102.1 For those types of errors due to invalid input, shouldn't we just guard it at parameter validation time and throw InvalidArgumentException without even sending the request to the broker? Thanks, Jun On Mon, Mar 16, 2015 at 10:37 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jun, Answering your questions: 101. If I understand you correctly, you are saying future producer versions (which will be ported to TMR_V1) won't be able to automatically create topic (if we unconditionally remove topic creation from there). But we need to this preserve logic. Ok, about your proposal: I'm not a big fan too, when it comes to differentiating clients directly in protocol schema. And also I'm not sure I understand at all why auto.create.topics.enable is a server side configuration. Can we deprecate this setting in future versions, add this setting to producer and based on that upon receiving UnknownTopic create topic explicitly by a separate producer call via adminClient? 102.1. Hm, yes. It's because we want to support batching and at the same time we want to give descriptive error messages for clients. Since AdminClient holds the context to construct such messages (e.g. AdminClient layer can know that InvalidArgumentsCode means two cases: either invalid number - e.g. -1; or replication-factor was provided while partitions argument wasn't) - I wrapped responses in Exceptions. But I'm open to any other ideas, this was just initial version. 102.2. Yes, I agree. I'll change that to probably some other dto. Thanks, Andrii Biletskyi On Fri, Mar 13, 2015 at 7:16 PM, Jun Rao j...@confluent.io wrote: Andrii, 101. That's what I was thinking too, but it may not be that simple. In TopicMetadataRequest_V1, we can let it not trigger auto topic creation. Then, in the producer side, if it gets an UnknownTopicException, it can explicitly issue a createTopicRequest for auto topic creation. On the consumer side, it will never issue createTopicRequest. This works when auto topic creation is enabled on the broker side. However, I am not sure how things will work when auto topic creation is disabled on the broker side. In this case, we want to have a way to manually create a topic, potentially through admin commands. However, then we need a way to distinguish createTopicRequest issued from the producer clients and the admin tools. May be we can add a new field in createTopicRequest and set it differently in the producer client and the admin client. However, I am not sure if that's the best approach. 2. Yes, refactoring existing requests is a non-trivial amount of work. I posted some comments in KAFKA-1927. We will probably have to fix KAFKA-1927 first, before adding the new logic in KAFKA-1694. Otherwise, the changes will be too big. 102. About the
[jira] [Updated] (KAFKA-2013) benchmark test for the purgatory
[ https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2013: Attachment: KAFKA-2013_2015-03-16_14:13:20.patch benchmark test for the purgatory Key: KAFKA-2013 URL: https://issues.apache.org/jira/browse/KAFKA-2013 Project: Kafka Issue Type: Test Components: purgatory Reporter: Yasuhiro Matsuda Assignee: Yasuhiro Matsuda Priority: Trivial Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch, KAFKA-2013_2015-03-16_14:13:20.patch We need a micro benchmark test for measuring the purgatory performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
RE: [KIP-DISCUSSION] KIP-13 Quotas
In Jay's approach, a client will simply experience a delay in receiving a response. The primary benefit is that there are no concerns regarding data-loss because the data has already been appended. Retries are also a non-issue since there is no need for them. However, the drawback to append and delay is that if the socket timeout is reached (30 second default I believe), the client can disconnect and try to resend the batch to the server. This will cause data duplication since the server cannot distinguish duplicate batches. However, it is very likely that the maximum quota delay will be lower than the socket timeout unless someone explicitly overrides it. We can make this even more unlikely by having a fixed lower bound on the socket timeout (10 seconds?). In this approach we must also ignore the request timeout since a small timeout will completely bypass quotas. In the other approach, assuming the client only retries a fixed number of times, it will eventually experience data loss since the producer will drop the batch at some point. IMO, it is more likely that we will see this issue in production than the other issues identified above. I agree with Jay that we can delay the request longer than the request timeout since it isn't possible to enforce perfectly on the server anyway. I think that we should have a maximum delay config on the server that provides a ceiling on the most time we can delay a request and have it be lower than the socket timeout. Initially, I preferred delay and error because it seems like the most natural way to handle quota violations.. but I'm starting to see the merit in Jay's approach. Practically speaking, it reduces the number of moving parts in delivering quotas for Kafka. All changes are localized to the broker and is compatible with existing clients. Client changes will be required only if we return quota metadata in the responses or add a quota metadata API. If we discover in production that this isn't working for some reason.. we can always revisit this approach of returning errors and having the clients handle them. Note that both these data loss/duplicate issues only affect the producer. Consumers should be fine regardless of the approach we choose. Aditya From: Jun Rao [j...@confluent.io] Sent: Monday, March 16, 2015 4:27 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364344#comment-14364344 ] Sriharsha Chintalapani commented on KAFKA-1646: --- [~waldenchen] Its looks like the patch is against 0.8.1.1 branch can you send us a patch against trunk. Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https
[ https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364263#comment-14364263 ] Joe Stein commented on KAFKA-2023: -- looks like maybe the issue is the version of git, i tried a few other asf repos same issue with git 1.7.1 what comes with yum install git git clone kafka repository requires https - Key: KAFKA-2023 URL: https://issues.apache.org/jira/browse/KAFKA-2023 Project: Kafka Issue Type: Bug Components: website Reporter: Anatoli Fomenko Priority: Minor Attachments: KAFKA-2023.patch From http://kafka.apache.org/code.html: Our code is kept in git. You can check it out like this: git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka On CentOS 6.5: {code} $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ error: RPC failed; result=22, HTTP code = 405 {code} while: {code} $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ remote: Counting objects: 24607, done. remote: Compressing objects: 100% (9212/9212), done. remote: Total 24607 (delta 14449), reused 19801 (delta 11465) Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done. Resolving deltas: 100% (14449/14449), done. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29091: Improve 1646 fix by reduce check if Os.IsWindows
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29091/#review76688 --- core/src/main/scala/kafka/utils/Utils.scala https://reviews.apache.org/r/29091/#comment124320 can we do setLength for all os not just specific to windows? - Sriharsha Chintalapani On March 13, 2015, 3:12 a.m., Qianlin Xia wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29091/ --- (Updated March 13, 2015, 3:12 a.m.) Review request for kafka. Bugs: KAFKA-1646 https://issues.apache.org/jira/browse/KAFKA-1646 Repository: kafka Description --- Improve 1646 fix by reduce check if Os.IsWindows Diffs - core/src/main/scala/kafka/log/FileMessageSet.scala e1f8b979c3e6f62ea235bd47bc1587a1291443f9 core/src/main/scala/kafka/log/Log.scala 46df8d99d977a3b010a9b9f4698187fa9bfb2498 core/src/main/scala/kafka/log/LogManager.scala 7cee5435b23fcd0d76f531004911a2ca499df4f8 core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 core/src/main/scala/kafka/utils/Utils.scala a89b0463685e6224d263bc9177075e1bb6b93d04 Diff: https://reviews.apache.org/r/29091/diff/ Testing --- Thanks, Qianlin Xia
[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https
[ https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364412#comment-14364412 ] Gwen Shapira commented on KAFKA-2023: - If its the default on a popular OS, I'd update the docs with this information. git clone kafka repository requires https - Key: KAFKA-2023 URL: https://issues.apache.org/jira/browse/KAFKA-2023 Project: Kafka Issue Type: Bug Components: website Reporter: Anatoli Fomenko Priority: Minor Attachments: KAFKA-2023.patch From http://kafka.apache.org/code.html: Our code is kept in git. You can check it out like this: git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka On CentOS 6.5: {code} $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ error: RPC failed; result=22, HTTP code = 405 {code} while: {code} $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ remote: Counting objects: 24607, done. remote: Compressing objects: 100% (9212/9212), done. remote: Total 24607 (delta 14449), reused 19801 (delta 11465) Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done. Resolving deltas: 100% (14449/14449), done. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31893: Patch for KAFKA-2013
On March 16, 2015, 5:17 p.m., Jun Rao wrote: core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala, line 193 https://reviews.apache.org/r/31893/diff/1/?file=890190#file890190line193 Is there a particular reason that we need to overwrite isCompleted()? Typically, only tryComplete() and onComplete() need to be overwritten in a subclass of DelayedOperation. Actually, I am not sure how we complete the requests before the timeout is reached since there is no explict call for tryComplete()? isCompleted checks if the current time has passed the schedule completion time rather than if forceComplete has been called. It makes isCompleted always accurate. Purgatory checks watcher lists every so often and calls isCompleted. Calling forceComplete from isCompeleted ensures that a completed request is removed from the timing wheels in the new implementation. In terms of timing, this is not very accurate because completed requests may stay longer then they should be. This doesn't affect the old implementaion at all, but it may impose some overheads on the new implementaion. Still, the new one outperforms the old one. It is ideal if we can call call forceComplete on scheduled completion time. It requires another timer (DelayQueue or Timer) for that. I think it is too much overhead to measure purgatory performace. And also it is hard to guarantee such a timer works accurately in this test setting. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31893/#review76566 --- On March 16, 2015, 8:23 p.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31893/ --- (Updated March 16, 2015, 8:23 p.m.) Review request for kafka. Bugs: KAFKA-2013 https://issues.apache.org/jira/browse/KAFKA-2013 Repository: kafka Description --- purgatory micro benchmark Diffs - core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION Diff: https://reviews.apache.org/r/31893/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: Review Request 31893: Patch for KAFKA-2013
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31893/ --- (Updated March 16, 2015, 8:23 p.m.) Review request for kafka. Bugs: KAFKA-2013 https://issues.apache.org/jira/browse/KAFKA-2013 Repository: kafka Description --- purgatory micro benchmark Diffs (updated) - core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION Diff: https://reviews.apache.org/r/31893/diff/ Testing --- Thanks, Yasuhiro Matsuda
[jira] [Commented] (KAFKA-2013) benchmark test for the purgatory
[ https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363843#comment-14363843 ] Yasuhiro Matsuda commented on KAFKA-2013: - Updated reviewboard https://reviews.apache.org/r/31893/diff/ against branch origin/trunk benchmark test for the purgatory Key: KAFKA-2013 URL: https://issues.apache.org/jira/browse/KAFKA-2013 Project: Kafka Issue Type: Test Components: purgatory Reporter: Yasuhiro Matsuda Assignee: Yasuhiro Matsuda Priority: Trivial Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch We need a micro benchmark test for measuring the purgatory performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2013) benchmark test for the purgatory
[ https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2013: Attachment: KAFKA-2013_2015-03-16_13:23:38.patch benchmark test for the purgatory Key: KAFKA-2013 URL: https://issues.apache.org/jira/browse/KAFKA-2013 Project: Kafka Issue Type: Test Components: purgatory Reporter: Yasuhiro Matsuda Assignee: Yasuhiro Matsuda Priority: Trivial Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch We need a micro benchmark test for measuring the purgatory performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31742: Patch for KAFKA-527
On March 13, 2015, 11:43 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/message/MessageWriter.scala, line 29 https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line29 Add a check that codec should not be NoCompression. Yasuhiro Matsuda wrote: Why the codec should not be NoCompression? The code works with NoCompression, too. That's right, it works with NoCompression too. My Bad. On March 13, 2015, 11:43 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/message/MessageWriter.scala, line 97 https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line97 Could we use comments in /** * */ format? Yasuhiro Matsuda wrote: Is this comment style prohibitted? This class is for internal use with fairly localized usage. We used to maintain a coding guidence (http://kafka.apache.org/coding-guide.html), but I think we did not do a great job enforcing it and the page itself is also a bit out dated. Jay added the checkstyle package in order to improve on this aspect, but that do not have comments rules in it. I thought the common rules in the code are: 1. Use /* */ for class definitions and user-facing API comments. 2. Use // for in-function comments (no-capitalization). But it is somehow not programmatically enforced. Anyways, let me know if you think that is too picky and we can try be more flexible in terms of commenting. On March 13, 2015, 11:43 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/message/MessageWriter.scala, line 117 https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line117 We can just pass in the Byte here. Yasuhiro Matsuda wrote: This is a contract of OutputStream. Cool. Could we add the optional override here? On March 13, 2015, 11:43 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/message/MessageWriter.scala, line 135 https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line135 Better group the private functions together after the public functions. Yasuhiro Matsuda wrote: Well, I don't think it is particulary better way to organize code, but if you insist I can change it. Kafka code base doesn't seem to follow that convention... Again, we did not do a good job enforcing any sort of such coding style, and it maybe just myself being unreasonable about these rules. I am open to other reviewers taking a look and giving his / her thoughts. On March 13, 2015, 11:43 p.m., Yasuhiro Matsuda wrote: The inheritance of MessageWriter from BufferingOutputStream is a bit confusing, since it will always use itself in the writePayload function parameter. I feel it is more clear to read the code if we just let MessageWriter contains a var of BufferingOutputStream; and instead of pass in the function logic of writing the message, we can just pass in messages and offsetCounter in the write() call which will then write the messages itself. Yasuhiro Matsuda wrote: It is true that the current code writes only through writePayload. But I wanted MessageWriter to be a subclass of OutputStream to be more generic in case we need to write additional inforation other than messages in future. As for now MessageWriter's only public function is write(key, codec) (valueWritefunction), which is used for writing a single message. Also its private functions withCrc32Prefix / withLengthPrefix is only used for message writing. So it is a bit unclear about your motivation in future extensions. Could you elaborate a bit more on that? - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31742/#review76454 --- On March 4, 2015, 7:43 p.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31742/ --- (Updated March 4, 2015, 7:43 p.m.) Review request for kafka. Bugs: KAFKA-527 https://issues.apache.org/jira/browse/KAFKA-527 Repository: kafka Description --- less byte copies Diffs - core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 9c694719dc9b515fb3c3ae96435a87b334044272 core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/31742/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: java.net.BindException: Address already in use
This is being tracked in KAFKA-1501. Typically, this won't happen on a dedicated machine. Thanks, Jun On Mon, Mar 16, 2015 at 5:02 AM, Tong Li liton...@us.ibm.com wrote: Hi guys, when I ran test, I got a lot of these exceptions. I wonder if you had similar problems running the tests and how resolved the issue. Thanks. Tong Li OpenStack Kafka Community Development Building 501/B205 liton...@us.ibm.com
[jira] [Created] (KAFKA-2021) Consolidate test classes for KafkaConfig
Gwen Shapira created KAFKA-2021: --- Summary: Consolidate test classes for KafkaConfig Key: KAFKA-2021 URL: https://issues.apache.org/jira/browse/KAFKA-2021 Project: Kafka Issue Type: Task Reporter: Gwen Shapira Priority: Minor We have kafka.server.KafkaConfigTest, KafkaConfigConfigDefTest and kafka.unit.KafkaTest (in a file called KafkaConfigTest.scala) I think consolidating them into one test class (or at list renaming so it will be clear how they are different) will make a lot of sense. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
java.net.BindException: Address already in use
Hi guys, when I ran test, I got a lot of these exceptions. I wonder if you had similar problems running the tests and how resolved the issue. Thanks. Tong Li OpenStack Kafka Community Development Building 501/B205 liton...@us.ibm.com
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363051#comment-14363051 ] Honghai Chen commented on KAFKA-1646: - Hey [~jkreps] Is it ok to add one configuration like log.preallocatefile to the configuration and change the three places of if Os.IsWindows to check the configuration? Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150312_200352.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1305) Controller can hang on controlled shutdown with auto leader balance enabled
[ https://issues.apache.org/jira/browse/KAFKA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363129#comment-14363129 ] Dmitry Bugaychenko commented on KAFKA-1305: --- Even with a fast dedicated channel there will be a race condition in switching leadership. It could be removed either by complicating the protocol (eg. the new leader shoul take leadership only after getting not a leader respone in fetcher thread from the old one, while the old leader should stop handling produce request allowing fetches only from the new leader untill it gets everything), or, may be, it is worth to consider getting rid of controller in partition leader election and use distributed elections in ZK. Controller can hang on controlled shutdown with auto leader balance enabled --- Key: KAFKA-1305 URL: https://issues.apache.org/jira/browse/KAFKA-1305 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Sriharsha Chintalapani Priority: Blocker Fix For: 0.8.2.0, 0.9.0 Attachments: KAFKA-1305.patch, KAFKA-1305.patch, KAFKA-1305_2014-10-13_07:30:45.patch This is relatively easy to reproduce especially when doing a rolling bounce. What happened here is as follows: 1. The previous controller was bounced and broker 265 became the new controller. 2. I went on to do a controlled shutdown of broker 265 (the new controller). 3. In the mean time the automatically scheduled preferred replica leader election process started doing its thing and starts sending LeaderAndIsrRequests/UpdateMetadataRequests to itself (and other brokers). (t@113 below). 4. While that's happening, the controlled shutdown process on 265 succeeds and proceeds to deregister itself from ZooKeeper and shuts down the socket server. 5. (ReplicaStateMachine actually removes deregistered brokers from the controller channel manager's list of brokers to send requests to. However, that removal cannot take place (t@18 below) because preferred replica leader election task owns the controller lock.) 6. So the request thread to broker 265 gets into infinite retries. 7. The entire broker shutdown process is blocked on controller shutdown for the same reason (it needs to acquire the controller lock). Relevant portions from the thread-dump: Controller-265-to-broker-265-send-thread - Thread t@113 java.lang.Thread.State: TIMED_WAITING at java.lang.Thread.sleep(Native Method) at kafka.controller.RequestSendThread$$anonfun$liftedTree1$1$1.apply$mcV$sp(ControllerChannelManager.scala:143) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:143) at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131) - locked java.lang.Object@6dbf14a7 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Locked ownable synchronizers: - None ... Thread-4 - Thread t@17 java.lang.Thread.State: WAITING on java.util.concurrent.locks.ReentrantLock$NonfairSync@4836840 owned by: kafka-scheduler-0 at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:842) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1178) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) at kafka.utils.Utils$.inLock(Utils.scala:536) at kafka.controller.KafkaController.shutdown(KafkaController.scala:642) at kafka.server.KafkaServer$$anonfun$shutdown$9.apply$mcV$sp(KafkaServer.scala:242) at kafka.utils.Utils$.swallow(Utils.scala:167) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:46) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:46) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:242) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:46) at kafka.Kafka$$anon$1.run(Kafka.scala:42) ...
[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils
[ https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363164#comment-14363164 ] Tong Li commented on KAFKA-1926: @Jun Rao, awesome comments. I will be following the directions and provide new patch set. This also confirms the direction that I am going. Thanks. Replace kafka.utils.Utils with o.a.k.common.utils.Utils --- Key: KAFKA-1926 URL: https://issues.apache.org/jira/browse/KAFKA-1926 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2.0 Reporter: Jay Kreps Labels: newbie, patch Attachments: KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926.patch There is currently a lot of duplication between the Utils class in common and the one in core. Our plan has been to deprecate duplicate code in the server and replace it with the new common code. As such we should evaluate each method in the scala Utils and do one of the following: 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose utility in active use that is not Kafka-specific. If we migrate it we should really think about the API and make sure there is some test coverage. A few things in there are kind of funky and we shouldn't just blindly copy them over. 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold any utilities that really need to make use of Scala features to be convenient. 3. Delete it if it is not used, or has a bad api. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs
[ https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363399#comment-14363399 ] Gwen Shapira commented on KAFKA-1809: - Updated reviewboard https://reviews.apache.org/r/28769/diff/ against branch trunk Refactor brokers to allow listening on multiple ports and IPs -- Key: KAFKA-1809 URL: https://issues.apache.org/jira/browse/KAFKA-1809 Project: Kafka Issue Type: Sub-task Components: security Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, KAFKA-1809_2015-03-16_09:02:18.patch The goal is to eventually support different security mechanisms on different ports. Currently brokers are defined as host+port pair, and this definition exists throughout the code-base, therefore some refactoring is needed to support multiple ports for a single broker. The detailed design is here: https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs
[ https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1809: Attachment: KAFKA-1809_2015-03-16_09:40:49.patch Refactor brokers to allow listening on multiple ports and IPs -- Key: KAFKA-1809 URL: https://issues.apache.org/jira/browse/KAFKA-1809 Project: Kafka Issue Type: Sub-task Components: security Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, KAFKA-1809_2015-03-16_09:02:18.patch, KAFKA-1809_2015-03-16_09:40:49.patch The goal is to eventually support different security mechanisms on different ports. Currently brokers are defined as host+port pair, and this definition exists throughout the code-base, therefore some refactoring is needed to support multiple ports for a single broker. The detailed design is here: https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KIP-15 close(timeout) for producer
Agree. Since throwing exception when close() is called in callback won’t work because we are catching all the exceptions from callback, blocking might be the only option we have here. Jiangjie (Becket) Qin On 3/15/15, 11:56 AM, Jay Kreps jay.kr...@gmail.com wrote: Cool. I think blocking is good or alternately throwing an exception directly from close(). Basically I would just worry about subtly doing something slightly different from what the user asked for as it will be hard to notice that behavior difference. -Jay On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, I have modified the KIP as you suggested. I thinks as long as we have consistent define for timeout across Kafka interface, there would be no problem. And I also agree it is better if we can make producer block when close() is called from sender thread so user will notice something went wrong. Thanks. Jiangjie (Becket) Qin On 3/14/15, 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jiangjie, I think this is going to be very confusing that close(0) waits indefinitely and close(-1) waits for 0. I understand this appears in other apis, but it is a constant cause of bugs. Let's not repeat that mistake. Let's make close(0) wait for 0. We don't need a way to wait indefinitely as we already have close() so having a magical constant for that is redundant. Calling close() from the I/O thread was already possible and would block indefinitely. I think trying to silently change the behavior is probably not right. I.e. if the user calls close() in the callback there is actually some misunderstanding and they need to think more, silently making this not block will hide the problem from them which is the opposite of what we want. -Jay On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Joe Jay, Thanks for the comments on the voting thread. Since it seems we probably will have more discussion on this, I am just replying from the discussion thread here. I’ve updated the KIP page to make it less like half-baked, apologize for the rush... The contract in current KIP is: 1. close() - wait until all requests either are sent or reach request timeout. 2. close(-1, TimeUnit.MILLISECONDS) - close immediately 3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(), i.e. Wait until all requests are sent or reach request timeout 4. close(5, TimeUnit.MILLISECONDS) - try the best to finish sending in 5 milliseconds, if something went wrong, just shutdown the producer anyway, my callback will handle the failures. About how we define what timeout value stands for, I actually struggled a little bit when wrote the patch. Intuitively, close(0) should mean immediately, however it seems that all the existing java class have this convention of timeout=0 means no timeout or never timeout (Thread.join(0), Object.wait(0), etc.) So here the dilemma is either we follow the intuition or we follow the convention. What I chose is to follow the convention but document the interface to let user be aware of the usage. The reason is that I think producer.close() is a public interface so it might be better to follow java convention. Whereas selector is not a public interface that used by user, so as long as it makes sense to us, it is less a problem to be different from java convention. That said since consumer.poll(timeout) is also a public interface, I think it also makes sense to make producer.close() to have the same definition of consumer.poll(timeout). The main argument for keeping a timeout in close would be separating the close timeout from request timeout, which probably makes sense. I would guess typically the request timeout would be long (e.g. 60 seconds) because we might want to consider retries with back off time. If we have multiple batches in accumulator, in worst case that could take up to several minutes to complete all the requests. But when we close a producer, we might not want to wait for that long as it might cause some other problem like deployment tool timeout. There is also a subtle difference between close(timeout) and flush(timeout). The only purpose for flush() is to write data to the broker, so it makes perfect sense to wait until request timeout. I think that is why flush(timeout) looks strange. On the other hand, the top priority for close() is to close the producer rather than flush() data, so close(timeout) gives guarantee on bounded waiting for its main job. Sorry for the confusion about forceClose flag. It is not a public interface. I mentioned it in Proposed Changes section which I thought was supposed to provide implementation details. Thanks again for all the comments and suggestions! Jiangjie (Becket) Qin On 3/10/15, 8:57 PM, Jiangjie Qin j...@linkedin.com wrote: The KIP page has been
[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs
[ https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363429#comment-14363429 ] Gwen Shapira commented on KAFKA-1809: - [~junrao]: 1. Addressed all sub-points 2. I think using actual version numbers in the config is more admin-friendly. I changed the parseConfig function to just use the 3 significant version numbers. 3. Fixed 4. good catch, that was silly :) fixed. 5. Fixed 6. I had to change this configurationin order to test end-to-end with a non-default protocol 7. Fixed 8. Yep. 9. Merge bug. Fixed this. 10. TRACE is used for testing only, because it was important to make sure that things still work when I use the non-default protocol. For example in SocketServerTest, but also I used it in manual testing. 11. Fixed 12. For performance reasons. Especially when validating the segments at the end. In current patch I changed it for all replica_testcases. 13. Ick! update metadatarequest was missing from the ser/de test suite! added it and validated that it catches the issue. 14. Ran system-tests (replica testcases only) 14.2 Ran with console producer and consumer. I'll open a separate JIRA for the rest of the tools, but I think it can go after we add the security protocol implementations. 14.3 Opened separate JIRA for this. Refactor brokers to allow listening on multiple ports and IPs -- Key: KAFKA-1809 URL: https://issues.apache.org/jira/browse/KAFKA-1809 Project: Kafka Issue Type: Sub-task Components: security Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, KAFKA-1809_2015-03-16_09:02:18.patch The goal is to eventually support different security mechanisms on different ports. Currently brokers are defined as host+port pair, and this definition exists throughout the code-base, therefore some refactoring is needed to support multiple ports for a single broker. The detailed design is here: https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Question about concurrency during Log config change
Jay, Thanks for quick response. Yes, this might be not that harmful for users, I'm not sure about that. But it definitely looks like data race. Your solution is simple and should work, hard to tell promptly when it's about concurrency. Initially I was looking through this code to understand whether we can inherit this approach for Global Brokers Config. In this case your solution will be harder to implement since we access broker's config in many-many different places. But that's another story. Thanks, Andrii Biletskyi On Mon, Mar 16, 2015 at 5:56 PM, Jay Kreps jay.kr...@gmail.com wrote: You are correct. Each read will be a valid value but there is no guarantee that subsequent reads will read from the same config. I don't think that is a problem, do you? If we want to strengthen the guarantee we can grab the config once in the method val config = log.config and then do however many accesses against that variable which will remain constant even if the config is updated in the course of the method. -Jay On Mon, Mar 16, 2015 at 8:50 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Hi all, I was looking through the code related to dynamic Log config change feature and noticed the way we deal with concurrency there. I have a question about it. The Log class holds volatile LogConfig property, almost all methods in Log.scala are synchronized on private lock object. But the code in TopicConfigManager ( https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/server/TopicConfigManager.scala#L108 ) which substitutes Log's logConfig is not synchronized. Code execution example: Thread 1: Log.append - Log:288 config.*maxMessageSize* is accessed https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L288 Thread 2: handles log config change - TopicConfigManager:108 (see above) substitutes log's config - changes *maxMessageSize* and *segmentSize* Thread 1: Log.append Log:299 - code accesses config.*segmentSize* and pickups updated config setting https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L299 So looks like we accessed object in partial state - in scope of one procedure (Log.append) we took one setting from the old state (maxMessageSize), and the other one from the updated state. Methods in Log are synchronized, as mentioned above. But logConfig is only volatile which solves visibility problems but doesn't prevent it from being changed in other thread, as I understand. Am I missing something here? Thanks, Andrii Biletskyi
[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs
[ https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363465#comment-14363465 ] Gwen Shapira commented on KAFKA-1809: - Updated reviewboard https://reviews.apache.org/r/28769/diff/ against branch trunk Refactor brokers to allow listening on multiple ports and IPs -- Key: KAFKA-1809 URL: https://issues.apache.org/jira/browse/KAFKA-1809 Project: Kafka Issue Type: Sub-task Components: security Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, KAFKA-1809_2015-03-16_09:02:18.patch, KAFKA-1809_2015-03-16_09:40:49.patch The goal is to eventually support different security mechanisms on different ports. Currently brokers are defined as host+port pair, and this definition exists throughout the code-base, therefore some refactoring is needed to support multiple ports for a single broker. The detailed design is here: https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network
[ https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1928: - Issue Type: Sub-task (was: Improvement) Parent: KAFKA-1682 Move kafka.network over to using the network classes in org.apache.kafka.common.network --- Key: KAFKA-1928 URL: https://issues.apache.org/jira/browse/KAFKA-1928 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Gwen Shapira As part of the common package we introduced a bunch of network related code and abstractions. We should look into replacing a lot of what is in kafka.network with this code. Duplicate classes include things like Receive, Send, etc. It is likely possible to also refactor the SocketServer to make use of Selector which should significantly simplify it's code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network
[ https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1928: - Component/s: security Move kafka.network over to using the network classes in org.apache.kafka.common.network --- Key: KAFKA-1928 URL: https://issues.apache.org/jira/browse/KAFKA-1928 Project: Kafka Issue Type: Improvement Components: security Reporter: Jay Kreps Assignee: Gwen Shapira As part of the common package we introduced a bunch of network related code and abstractions. We should look into replacing a lot of what is in kafka.network with this code. Duplicate classes include things like Receive, Send, etc. It is likely possible to also refactor the SocketServer to make use of Selector which should significantly simplify it's code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 28769: Patch for KAFKA-1809
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28769/ --- (Updated March 16, 2015, 4:41 p.m.) Review request for kafka. Bugs: KAFKA-1809 https://issues.apache.org/jira/browse/KAFKA-1809 Repository: kafka Description (updated) --- forgot rest of patch Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java fa9daaef66ff7961e1c46cd0cd8fed18a53bccd8 clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/utils/Utils.java 920b51a6c3c99639fbc9dc0656373c19fabd clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java c899813d55b9c4786adde3d840f040d6645d27c8 config/server.properties 1614260b71a658b405bb24157c8f12b1f1031aa5 core/src/main/scala/kafka/admin/AdminUtils.scala b700110f2d7f1ede235af55d8e37e1b5592c6c7d core/src/main/scala/kafka/admin/TopicCommand.scala f400b71f8444fffd3fc1d8398a283682390eba4e core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala 24aaf954dc42e2084454fa5fc9e8f388ea95c756 core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 4ff7e8f8cc695551dd5d2fe65c74f6b6c571e340 core/src/main/scala/kafka/api/TopicMetadata.scala 0190076df0adf906ecd332284f222ff974b315fc core/src/main/scala/kafka/api/TopicMetadataResponse.scala 92ac4e687be22e4800199c0666bfac5e0059e5bb core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 530982e36b17934b8cc5fb668075a5342e142c59 core/src/main/scala/kafka/client/ClientUtils.scala ebba87f0566684c796c26cb76c64b4640a5ccfde core/src/main/scala/kafka/cluster/Broker.scala 0060add008bb3bc4b0092f2173c469fce0120be6 core/src/main/scala/kafka/cluster/BrokerEndPoint.scala PRE-CREATION core/src/main/scala/kafka/cluster/EndPoint.scala PRE-CREATION core/src/main/scala/kafka/cluster/SecurityProtocol.scala PRE-CREATION core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala PRE-CREATION core/src/main/scala/kafka/consumer/ConsumerConfig.scala 9ebbee6c16dc83767297c729d2d74ebbd063a993 core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b9e2bea7b442a19bcebd1b350d39541a8c9dd068 core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 152fda5d1dcdf319399fdeeb8457006090ebe56c core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala cca815a128419e146feff53adaeddc901bb5de1f core/src/main/scala/kafka/controller/ControllerChannelManager.scala c582191636f6188c25d62a67ff0315b56f163133 core/src/main/scala/kafka/controller/KafkaController.scala 09fc46d759b74bcdad2d2a610d9c5a93ff02423f core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala d281bb31a66fd749ecddfbe38479b6903f436831 core/src/main/scala/kafka/javaapi/TopicMetadata.scala f384e04678df10a5b46a439f475c63371bf8e32b core/src/main/scala/kafka/network/RequestChannel.scala 7b1db3dbbb2c0676f166890f566c14aa248467ab core/src/main/scala/kafka/network/SocketServer.scala 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 core/src/main/scala/kafka/producer/ProducerPool.scala 43df70bb461dd3e385e6b20396adef3c4016a3fc core/src/main/scala/kafka/server/AbstractFetcherManager.scala 20c00cb8cc2351950edbc8cb1752905a0c26e79f core/src/main/scala/kafka/server/AbstractFetcherThread.scala e731df4b2a3e44aa3d761713a09b1070aff81430 core/src/main/scala/kafka/server/KafkaApis.scala 35af98f0bc1b6a50bd1d97a30147593f8c6a422d core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c core/src/main/scala/kafka/server/KafkaHealthcheck.scala 7907987e43404487382de7f4cc294f0d01ac15a7 core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 core/src/main/scala/kafka/server/MetadataCache.scala 6aef6e4508ecadbbcc1e12bed2054547b7aa333e core/src/main/scala/kafka/server/ReplicaFetcherManager.scala 351dbbad3bdb709937943b336a5b0a9e0162a5e2 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 96faa7b4ed7c9ba8a3f6f9f114bd94e19b3a7ac0 core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala d1e7c434e77859d746b8dc68dd5d5a3740425e79 core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala ba6ddd7a909df79a0f7d45e8b4a2af94ea0fceb6 core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b4f903b6c7c3bb725cac7c05eb1f885906413c4d core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala 111c9a8b94ce45d95551482e9fd3f8c1cccbf548 core/src/main/scala/kafka/utils/Utils.scala 738c1af9ef5de16fdf5130daab69757a14c48b5c core/src/main/scala/kafka/utils/ZkUtils.scala 7ae999ec619443d35a9cb8fbcd531fca0c51c8c0
Re: [DISCUSSION] KIP-15 close(timeout) for producer
It seems there are two options we can choose from when close() is called from sender thread (callback): 1. Log an error and close the producer using close(-1) 2. Log an error and block. (Throwing an exception will not work because we catch all the exception thrown from user callback. It will just lead to an error log.) My concern for the first option is that the producer will be closed even if we logged and error. I am wondering if some user would not even take a look at the log if producer is closed normally. Because from the programs behavior, everything looks good. If that is the case, the error message we logged probably will just be ignored until some day when people check the log and see it. As for the second option, because producer does not close but blocks. User will notice this the first time they run the program. They probably will look at the log to see why producer could not be closed and they will see the error log we put there. So they will get informed about this mis-usage of close() in sender thread the first time they run the code instead of some time later. Personally I prefer the second one because it is more obvious that something was wrong. Jiangjie (Becket) Qin On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote: Yeah I agree we should not silently change the behavior of the function with the given parameters; and I would prefer error-logging-and-shutdown over blocking when close(0) is used, since as Neha suggested blocking would also not proceed with sending any data, bu will just let users to realize the issue later than sooner. On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede n...@confluent.io wrote: And I also agree it is better if we can make producer block when close() is called from sender thread so user will notice something went wrong. This isn't a great experience either. Why can't we just throw an exception for a behavior we know is incorrect and we'd like the user to know. Blocking as a means of doing that seems wrong and annoying. On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps jay.kr...@gmail.com wrote: Cool. I think blocking is good or alternately throwing an exception directly from close(). Basically I would just worry about subtly doing something slightly different from what the user asked for as it will be hard to notice that behavior difference. -Jay On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, I have modified the KIP as you suggested. I thinks as long as we have consistent define for timeout across Kafka interface, there would be no problem. And I also agree it is better if we can make producer block when close() is called from sender thread so user will notice something went wrong. Thanks. Jiangjie (Becket) Qin On 3/14/15, 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jiangjie, I think this is going to be very confusing that close(0) waits indefinitely and close(-1) waits for 0. I understand this appears in other apis, but it is a constant cause of bugs. Let's not repeat that mistake. Let's make close(0) wait for 0. We don't need a way to wait indefinitely as we already have close() so having a magical constant for that is redundant. Calling close() from the I/O thread was already possible and would block indefinitely. I think trying to silently change the behavior is probably not right. I.e. if the user calls close() in the callback there is actually some misunderstanding and they need to think more, silently making this not block will hide the problem from them which is the opposite of what we want. -Jay On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Joe Jay, Thanks for the comments on the voting thread. Since it seems we probably will have more discussion on this, I am just replying from the discussion thread here. I’ve updated the KIP page to make it less like half-baked, apologize for the rush... The contract in current KIP is: 1. close() - wait until all requests either are sent or reach request timeout. 2. close(-1, TimeUnit.MILLISECONDS) - close immediately 3. close(0, TimeUnit.MILLISECONDS) - equivalent to close(), i.e. Wait until all requests are sent or reach request timeout 4. close(5, TimeUnit.MILLISECONDS) - try the best to finish sending in 5 milliseconds, if something went wrong, just shutdown the producer anyway, my callback will handle the failures. About how we define what timeout value stands for, I actually struggled a little bit when wrote the patch. Intuitively, close(0) should mean immediately, however it seems that all the existing java class have this convention of timeout=0 means no timeout or never timeout (Thread.join(0),
[jira] [Updated] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs
[ https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1809: Attachment: KAFKA-1809_2015-03-16_09:02:18.patch Refactor brokers to allow listening on multiple ports and IPs -- Key: KAFKA-1809 URL: https://issues.apache.org/jira/browse/KAFKA-1809 Project: Kafka Issue Type: Sub-task Components: security Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-1809.patch, KAFKA-1809.v1.patch, KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, KAFKA-1809_2015-03-16_09:02:18.patch The goal is to eventually support different security mechanisms on different ports. Currently brokers are defined as host+port pair, and this definition exists throughout the code-base, therefore some refactoring is needed to support multiple ports for a single broker. The detailed design is here: https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 28769: Patch for KAFKA-1809
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28769/ --- (Updated March 16, 2015, 4:02 p.m.) Review request for kafka. Bugs: KAFKA-1809 https://issues.apache.org/jira/browse/KAFKA-1809 Repository: kafka Description (updated) --- squashing multi-broker-endpoint patches Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/ApiVersion.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java PRE-CREATION core/src/main/scala/kafka/cluster/BrokerEndPoint.scala PRE-CREATION core/src/main/scala/kafka/cluster/EndPoint.scala PRE-CREATION core/src/main/scala/kafka/cluster/SecurityProtocol.scala PRE-CREATION core/src/main/scala/kafka/common/BrokerEndPointNotAvailableException.scala PRE-CREATION core/src/test/scala/unit/kafka/cluster/BrokerTest.scala PRE-CREATION system_test/run_all.sh PRE-CREATION system_test/run_all_replica.sh PRE-CREATION Diff: https://reviews.apache.org/r/28769/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-1994) Evaluate performance effect of chroot check on Topic creation
[ https://issues.apache.org/jira/browse/KAFKA-1994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363349#comment-14363349 ] Gwen Shapira commented on KAFKA-1994: - New patch looks good. [~singhashish], can you share how does the performance of the code with new patch compares to that of the older solution and to that of createPersistent() without any of the checks? Evaluate performance effect of chroot check on Topic creation - Key: KAFKA-1994 URL: https://issues.apache.org/jira/browse/KAFKA-1994 Project: Kafka Issue Type: Improvement Reporter: Ashish K Singh Assignee: Ashish K Singh Attachments: KAFKA-1994.patch, KAFKA-1994_2015-03-03_18:19:45.patch KAFKA-1664 adds check for chroot while creating a node in ZK. ZkPath checks if namespace exists before trying to create a path in ZK. This raises a concern that checking namespace for each path creation might be unnecessary and can potentially make creations expensive. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Question about concurrency during Log config change
Hi all, I was looking through the code related to dynamic Log config change feature and noticed the way we deal with concurrency there. I have a question about it. The Log class holds volatile LogConfig property, almost all methods in Log.scala are synchronized on private lock object. But the code in TopicConfigManager ( https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/server/TopicConfigManager.scala#L108 ) which substitutes Log's logConfig is not synchronized. Code execution example: Thread 1: Log.append - Log:288 config.*maxMessageSize* is accessed https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L288 Thread 2: handles log config change - TopicConfigManager:108 (see above) substitutes log's config - changes *maxMessageSize* and *segmentSize* Thread 1: Log.append Log:299 - code accesses config.*segmentSize* and pickups updated config setting https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L299 So looks like we accessed object in partial state - in scope of one procedure (Log.append) we took one setting from the old state (maxMessageSize), and the other one from the updated state. Methods in Log are synchronized, as mentioned above. But logConfig is only volatile which solves visibility problems but doesn't prevent it from being changed in other thread, as I understand. Am I missing something here? Thanks, Andrii Biletskyi
[jira] [Assigned] (KAFKA-2021) Consolidate test classes for KafkaConfig
[ https://issues.apache.org/jira/browse/KAFKA-2021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrii Biletskyi reassigned KAFKA-2021: --- Assignee: Andrii Biletskyi Consolidate test classes for KafkaConfig Key: KAFKA-2021 URL: https://issues.apache.org/jira/browse/KAFKA-2021 Project: Kafka Issue Type: Task Reporter: Gwen Shapira Assignee: Andrii Biletskyi Priority: Minor We have kafka.server.KafkaConfigTest, KafkaConfigConfigDefTest and kafka.unit.KafkaTest (in a file called KafkaConfigTest.scala) I think consolidating them into one test class (or at list renaming so it will be clear how they are different) will make a lot of sense. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Question about concurrency during Log config change
You are correct. Each read will be a valid value but there is no guarantee that subsequent reads will read from the same config. I don't think that is a problem, do you? If we want to strengthen the guarantee we can grab the config once in the method val config = log.config and then do however many accesses against that variable which will remain constant even if the config is updated in the course of the method. -Jay On Mon, Mar 16, 2015 at 8:50 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Hi all, I was looking through the code related to dynamic Log config change feature and noticed the way we deal with concurrency there. I have a question about it. The Log class holds volatile LogConfig property, almost all methods in Log.scala are synchronized on private lock object. But the code in TopicConfigManager ( https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/server/TopicConfigManager.scala#L108 ) which substitutes Log's logConfig is not synchronized. Code execution example: Thread 1: Log.append - Log:288 config.*maxMessageSize* is accessed https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L288 Thread 2: handles log config change - TopicConfigManager:108 (see above) substitutes log's config - changes *maxMessageSize* and *segmentSize* Thread 1: Log.append Log:299 - code accesses config.*segmentSize* and pickups updated config setting https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L299 So looks like we accessed object in partial state - in scope of one procedure (Log.append) we took one setting from the old state (maxMessageSize), and the other one from the updated state. Methods in Log are synchronized, as mentioned above. But logConfig is only volatile which solves visibility problems but doesn't prevent it from being changed in other thread, as I understand. Am I missing something here? Thanks, Andrii Biletskyi
Re: [DISCUSSION] KIP-15 close(timeout) for producer
HI Jiangjie, As far as I understand calling close() in the ioThread is not common, as it may only trigger when we saw some non-retriable error. Hence when user run their program it is unlikely that close() will be triggered and problem will be detected. So it seems to me that from the error detection aspect these two options seems to be the same as people will usually detect it from the producer metrics all dropping to 0. Guozhang On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It seems there are two options we can choose from when close() is called from sender thread (callback): 1. Log an error and close the producer using close(-1) 2. Log an error and block. (Throwing an exception will not work because we catch all the exception thrown from user callback. It will just lead to an error log.) My concern for the first option is that the producer will be closed even if we logged and error. I am wondering if some user would not even take a look at the log if producer is closed normally. Because from the programs behavior, everything looks good. If that is the case, the error message we logged probably will just be ignored until some day when people check the log and see it. As for the second option, because producer does not close but blocks. User will notice this the first time they run the program. They probably will look at the log to see why producer could not be closed and they will see the error log we put there. So they will get informed about this mis-usage of close() in sender thread the first time they run the code instead of some time later. Personally I prefer the second one because it is more obvious that something was wrong. Jiangjie (Becket) Qin On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote: Yeah I agree we should not silently change the behavior of the function with the given parameters; and I would prefer error-logging-and-shutdown over blocking when close(0) is used, since as Neha suggested blocking would also not proceed with sending any data, bu will just let users to realize the issue later than sooner. On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede n...@confluent.io wrote: And I also agree it is better if we can make producer block when close() is called from sender thread so user will notice something went wrong. This isn't a great experience either. Why can't we just throw an exception for a behavior we know is incorrect and we'd like the user to know. Blocking as a means of doing that seems wrong and annoying. On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps jay.kr...@gmail.com wrote: Cool. I think blocking is good or alternately throwing an exception directly from close(). Basically I would just worry about subtly doing something slightly different from what the user asked for as it will be hard to notice that behavior difference. -Jay On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, I have modified the KIP as you suggested. I thinks as long as we have consistent define for timeout across Kafka interface, there would be no problem. And I also agree it is better if we can make producer block when close() is called from sender thread so user will notice something went wrong. Thanks. Jiangjie (Becket) Qin On 3/14/15, 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jiangjie, I think this is going to be very confusing that close(0) waits indefinitely and close(-1) waits for 0. I understand this appears in other apis, but it is a constant cause of bugs. Let's not repeat that mistake. Let's make close(0) wait for 0. We don't need a way to wait indefinitely as we already have close() so having a magical constant for that is redundant. Calling close() from the I/O thread was already possible and would block indefinitely. I think trying to silently change the behavior is probably not right. I.e. if the user calls close() in the callback there is actually some misunderstanding and they need to think more, silently making this not block will hide the problem from them which is the opposite of what we want. -Jay On Thu, Mar 12, 2015 at 1:49 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Joe Jay, Thanks for the comments on the voting thread. Since it seems we probably will have more discussion on this, I am just replying from the discussion thread here. I’ve updated the KIP page to make it less like half-baked, apologize for the rush... The contract in current KIP is: 1. close() - wait until all requests either are sent or reach request timeout. 2. close(-1, TimeUnit.MILLISECONDS) - close immediately 3. close(0,
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Jun, Answering your questions: 101. If I understand you correctly, you are saying future producer versions (which will be ported to TMR_V1) won't be able to automatically create topic (if we unconditionally remove topic creation from there). But we need to this preserve logic. Ok, about your proposal: I'm not a big fan too, when it comes to differentiating clients directly in protocol schema. And also I'm not sure I understand at all why auto.create.topics.enable is a server side configuration. Can we deprecate this setting in future versions, add this setting to producer and based on that upon receiving UnknownTopic create topic explicitly by a separate producer call via adminClient? 102.1. Hm, yes. It's because we want to support batching and at the same time we want to give descriptive error messages for clients. Since AdminClient holds the context to construct such messages (e.g. AdminClient layer can know that InvalidArgumentsCode means two cases: either invalid number - e.g. -1; or replication-factor was provided while partitions argument wasn't) - I wrapped responses in Exceptions. But I'm open to any other ideas, this was just initial version. 102.2. Yes, I agree. I'll change that to probably some other dto. Thanks, Andrii Biletskyi On Fri, Mar 13, 2015 at 7:16 PM, Jun Rao j...@confluent.io wrote: Andrii, 101. That's what I was thinking too, but it may not be that simple. In TopicMetadataRequest_V1, we can let it not trigger auto topic creation. Then, in the producer side, if it gets an UnknownTopicException, it can explicitly issue a createTopicRequest for auto topic creation. On the consumer side, it will never issue createTopicRequest. This works when auto topic creation is enabled on the broker side. However, I am not sure how things will work when auto topic creation is disabled on the broker side. In this case, we want to have a way to manually create a topic, potentially through admin commands. However, then we need a way to distinguish createTopicRequest issued from the producer clients and the admin tools. May be we can add a new field in createTopicRequest and set it differently in the producer client and the admin client. However, I am not sure if that's the best approach. 2. Yes, refactoring existing requests is a non-trivial amount of work. I posted some comments in KAFKA-1927. We will probably have to fix KAFKA-1927 first, before adding the new logic in KAFKA-1694. Otherwise, the changes will be too big. 102. About the AdminClient: 102.1. It's a bit weird that we return exception in the api. It seems that we should either return error code or throw an exception when getting the response state. 102.2. We probably shouldn't explicitly use the request object in the api. Not every request evolution requires an api change. Thanks, Jun On Fri, Mar 13, 2015 at 4:08 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jun, Thanks for you comments. Answers inline: 100. There are a few fields such as ReplicaAssignment, ReassignPartitionRequest, and PartitionsSerialized that are represented as a string, but contain composite structures in json. Could we flatten them out directly in the protocol definition as arrays/records? Yes, now with Admin Client this looks a bit weird. My initial motivation was: ReassignPartitionCommand accepts input in json, we want to remain tools' interfaces unchanged, where possible. If we port it to deserialized format, in CLI (/tools project) we will have to add some json library since /tools is written in java and we'll need to deserialize json file provided by a user. Can we quickly agree on what this library should be (Jackson, GSON, whatever)? 101. Does TopicMetadataRequest v1 still trigger auto topic creation? This will be a bit weird now that we have a separate topic creation api. Have you thought about how the new createTopicRequest and TopicMetadataRequest v1 will be used in the producer/consumer client, in addition to admin tools? For example, ideally, we don't want TopicMetadataRequest from the consumer to trigger auto topic creation. I agree, this strange logic should be fixed. I'm not confident in this Kafka part so correct me if I'm wrong, but it doesn't look like a hard thing to do, I think we can leverage AdminClient for that in Producer and unconditionally remove topic creation from the TopicMetadataRequest_V1. 2. I think Jay meant getting rid of scala classes like HeartbeatRequestAndHeader and HeartbeatResponseAndHeader. We did that as a stop-gap thing when adding the new requests for the consumers. However, the long term plan is to get rid of all those and just reuse the java request/response in the client. Since this KIP proposes to add a significant number of new requests, perhaps we should bite the bullet to clean up the existing scala requests first before adding new ones? Yes,
[jira] [Commented] (KAFKA-2019) RoundRobinAssignor clusters by consumer
[ https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363625#comment-14363625 ] Joseph Holsten commented on KAFKA-2019: --- [~becket_qin] could you provide an example of the worst case unbalance for this code? I'm having trouble seeing how the hash-ordered round robit can have a worst case more than number-of-consumer-threads. Here's my reasoning: With the current consumer-ordered sort, the threadAssignor will iterate over the sequence: - c0:t0 - c0:t1 - c0:t2 - c0:t3 - c1:t0 - c1:t1 - c1:t2 - c1:t3 With the hash-ordered sort, the sequence could have a best case of: - c0:t0 - c1:t0 - c0:t1 - c1:t1 - c0:t2 - c1:t2 - c0:t3 - c1:t3 and a worst case identical to the consumer-ordered sort. For a partition count of {{(n+1)*m}}, consumer-ordered will produce a worst-case ordering every time, with {{c0}} always ending up with {{2m}} partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will produce this sort of worst case: Likelyhood that {{c0}} will have all m threads selected: {code} C(n*m,m) / P(n*m,n*n) = ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood of any consumer {{c0..cn} will have all its threads selected: {code} n * C(n*m,m) / P(n*m,n*n) = n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered with {{n=2}}, {{m=2}}: {code} 2 * C(2*2,2) / P(2*2,2*2) = 2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) = 2 * (6) / (24) = 0.5 {code} And with {{n=2}}, {{m=4}}: {code} 2 * C(2*4,4) / P(2*4,2*2) = 2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) = 2 * 70 / 40320 = ~ 0.00347 {code} I totally agree to the point about backward incompatibility, it would not be pleasant to have different servers using different thread sequences. So this code can't go in as a modification to the existing RoundRobinAssignor, perhaps as a new assignor? RoundRobinAssignor clusters by consumer --- Key: KAFKA-2019 URL: https://issues.apache.org/jira/browse/KAFKA-2019 Project: Kafka Issue Type: Bug Components: consumer Reporter: Joseph Holsten Assignee: Neha Narkhede Priority: Minor Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, KAFKA-2019.patch When rolling out a change today, I noticed that some of my consumers are greedy, taking far more partitions than others. The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds sorted by toString, which is {{ %s-%d.format(consumer, threadId)}}. This causes each consumer's threads to be adjacent to each other. One possible fix would be to define ConsumerThreadId.hashCode, and sort by that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2019) RoundRobinAssignor clusters by consumer
[ https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363625#comment-14363625 ] Joseph Holsten edited comment on KAFKA-2019 at 3/16/15 6:10 PM: [~becket_qin] could you provide an example of the worst case unbalance for this code? I'm having trouble seeing how the hash-ordered round robit can have a worst case more than number-of-consumer-threads. Here's my reasoning: With the current consumer-ordered sort, the threadAssignor will iterate over the sequence: - c0:t0 - c0:t1 - c0:t2 - c0:t3 - c1:t0 - c1:t1 - c1:t2 - c1:t3 With the hash-ordered sort, the sequence could have a best case of: - c0:t0 - c1:t0 - c0:t1 - c1:t1 - c0:t2 - c1:t2 - c0:t3 - c1:t3 and a worst case identical to the consumer-ordered sort. For a partition count of {{(n+1)*m}}, consumer-ordered will produce a worst-case ordering every time, with {{c0}} always ending up with {{2m}} partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will produce this sort of worst case: Likelyhood that {{c0}} will have all m threads selected: {code} C(n*m,m) / P(n*m,n*m) = ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood of any consumer {{c0..cn}} will have all its threads selected: {code} n * C(n*m,m) / P(n*m,n*m) = n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered with {{n=2}}, {{m=2}}: {code} 2 * C(2*2,2) / P(2*2,2*2) = 2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) = 2 * (6) / (24) = 0.5 {code} And with {{n=2}}, {{m=4}}: {code} 2 * C(2*4,4) / P(2*4,2*4) = 2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) = 2 * 70 / 40320 = ~ 0.00347 {code} I totally agree to the point about backward incompatibility, it would not be pleasant to have different servers using different thread sequences. So this code can't go in as a modification to the existing RoundRobinAssignor, perhaps as a new assignor? was (Author: josephholsten): [~becket_qin] could you provide an example of the worst case unbalance for this code? I'm having trouble seeing how the hash-ordered round robit can have a worst case more than number-of-consumer-threads. Here's my reasoning: With the current consumer-ordered sort, the threadAssignor will iterate over the sequence: - c0:t0 - c0:t1 - c0:t2 - c0:t3 - c1:t0 - c1:t1 - c1:t2 - c1:t3 With the hash-ordered sort, the sequence could have a best case of: - c0:t0 - c1:t0 - c0:t1 - c1:t1 - c0:t2 - c1:t2 - c0:t3 - c1:t3 and a worst case identical to the consumer-ordered sort. For a partition count of {{(n+1)*m}}, consumer-ordered will produce a worst-case ordering every time, with {{c0}} always ending up with {{2m}} partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will produce this sort of worst case: Likelyhood that {{c0}} will have all m threads selected: {code} C(n*m,m) / P(n*m,n*n) = ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood of any consumer {{c0..cn}} will have all its threads selected: {code} n * C(n*m,m) / P(n*m,n*n) = n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered with {{n=2}}, {{m=2}}: {code} 2 * C(2*2,2) / P(2*2,2*2) = 2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) = 2 * (6) / (24) = 0.5 {code} And with {{n=2}}, {{m=4}}: {code} 2 * C(2*4,4) / P(2*4,2*2) = 2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) = 2 * 70 / 40320 = ~ 0.00347 {code} I totally agree to the point about backward incompatibility, it would not be pleasant to have different servers using different thread sequences. So this code can't go in as a modification to the existing RoundRobinAssignor, perhaps as a new assignor? RoundRobinAssignor clusters by consumer --- Key: KAFKA-2019 URL: https://issues.apache.org/jira/browse/KAFKA-2019 Project: Kafka Issue Type: Bug Components: consumer Reporter: Joseph Holsten Assignee: Neha Narkhede Priority: Minor Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, KAFKA-2019.patch When rolling out a change today, I noticed that some of my consumers are greedy, taking far more partitions than others. The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds sorted by toString, which is {{ %s-%d.format(consumer, threadId)}}. This causes each consumer's threads to be adjacent to each other. One possible fix would be to define ConsumerThreadId.hashCode, and sort by that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2019) RoundRobinAssignor clusters by consumer
[ https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363625#comment-14363625 ] Joseph Holsten edited comment on KAFKA-2019 at 3/16/15 6:09 PM: [~becket_qin] could you provide an example of the worst case unbalance for this code? I'm having trouble seeing how the hash-ordered round robit can have a worst case more than number-of-consumer-threads. Here's my reasoning: With the current consumer-ordered sort, the threadAssignor will iterate over the sequence: - c0:t0 - c0:t1 - c0:t2 - c0:t3 - c1:t0 - c1:t1 - c1:t2 - c1:t3 With the hash-ordered sort, the sequence could have a best case of: - c0:t0 - c1:t0 - c0:t1 - c1:t1 - c0:t2 - c1:t2 - c0:t3 - c1:t3 and a worst case identical to the consumer-ordered sort. For a partition count of {{(n+1)*m}}, consumer-ordered will produce a worst-case ordering every time, with {{c0}} always ending up with {{2m}} partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will produce this sort of worst case: Likelyhood that {{c0}} will have all m threads selected: {code} C(n*m,m) / P(n*m,n*n) = ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood of any consumer {{c0..cn}} will have all its threads selected: {code} n * C(n*m,m) / P(n*m,n*n) = n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered with {{n=2}}, {{m=2}}: {code} 2 * C(2*2,2) / P(2*2,2*2) = 2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) = 2 * (6) / (24) = 0.5 {code} And with {{n=2}}, {{m=4}}: {code} 2 * C(2*4,4) / P(2*4,2*2) = 2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) = 2 * 70 / 40320 = ~ 0.00347 {code} I totally agree to the point about backward incompatibility, it would not be pleasant to have different servers using different thread sequences. So this code can't go in as a modification to the existing RoundRobinAssignor, perhaps as a new assignor? was (Author: josephholsten): [~becket_qin] could you provide an example of the worst case unbalance for this code? I'm having trouble seeing how the hash-ordered round robit can have a worst case more than number-of-consumer-threads. Here's my reasoning: With the current consumer-ordered sort, the threadAssignor will iterate over the sequence: - c0:t0 - c0:t1 - c0:t2 - c0:t3 - c1:t0 - c1:t1 - c1:t2 - c1:t3 With the hash-ordered sort, the sequence could have a best case of: - c0:t0 - c1:t0 - c0:t1 - c1:t1 - c0:t2 - c1:t2 - c0:t3 - c1:t3 and a worst case identical to the consumer-ordered sort. For a partition count of {{(n+1)*m}}, consumer-ordered will produce a worst-case ordering every time, with {{c0}} always ending up with {{2m}} partitions, and {{c1..cn}} ending up with m partitions. Hash-ordered will produce this sort of worst case: Likelyhood that {{c0}} will have all m threads selected: {code} C(n*m,m) / P(n*m,n*n) = ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood of any consumer {{c0..cn} will have all its threads selected: {code} n * C(n*m,m) / P(n*m,n*n) = n * ((n*m)!/m!((n*m)-(m))!) / ((n*m)!) {code} So the likelyhood that hash-ordering will be as unbalanced as consumer-ordered with {{n=2}}, {{m=2}}: {code} 2 * C(2*2,2) / P(2*2,2*2) = 2 * ((2*2)!/2!((2*2)-(2))!) / ((2*2)!) = 2 * (6) / (24) = 0.5 {code} And with {{n=2}}, {{m=4}}: {code} 2 * C(2*4,4) / P(2*4,2*2) = 2 * ((2*4)!/4!((2*4)-(4))!) / ((2*4)!) = 2 * 70 / 40320 = ~ 0.00347 {code} I totally agree to the point about backward incompatibility, it would not be pleasant to have different servers using different thread sequences. So this code can't go in as a modification to the existing RoundRobinAssignor, perhaps as a new assignor? RoundRobinAssignor clusters by consumer --- Key: KAFKA-2019 URL: https://issues.apache.org/jira/browse/KAFKA-2019 Project: Kafka Issue Type: Bug Components: consumer Reporter: Joseph Holsten Assignee: Neha Narkhede Priority: Minor Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, KAFKA-2019.patch When rolling out a change today, I noticed that some of my consumers are greedy, taking far more partitions than others. The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds sorted by toString, which is {{ %s-%d.format(consumer, threadId)}}. This causes each consumer's threads to be adjacent to each other. One possible fix would be to define ConsumerThreadId.hashCode, and sort by that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31893: Patch for KAFKA-2013
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31893/#review76566 --- Thanks for the patch. A few comments. core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala https://reviews.apache.org/r/31893/#comment124137 Do we need to make end volatile since it's being updated in separate thread? core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala https://reviews.apache.org/r/31893/#comment124141 Would it be better to rename this to sth like latencyToCompelete? core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala https://reviews.apache.org/r/31893/#comment124142 Variable due doesn't seem to be used? core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala https://reviews.apache.org/r/31893/#comment124147 I guess the sleep will be added when the actual rate exceeds the target rate? Would it be better to rename qtime as requestArrivalTime and interval as requestArrivalInterval? core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala https://reviews.apache.org/r/31893/#comment124139 It would be useful to make the # of keys configurable. core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala https://reviews.apache.org/r/31893/#comment124138 So far, we haven't used this syntax for println. For consistency, perhaps it's better to use the existing way of string formatting. core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala https://reviews.apache.org/r/31893/#comment124143 Could we add some comments on the meaning of mu and sigma? core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala https://reviews.apache.org/r/31893/#comment124144 Could we add some comments for the class? In particular, what does lamda mean? core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala https://reviews.apache.org/r/31893/#comment124145 It would be helpful to provide a high level description of what kind of distribution we get in the samples. Also, is there a particular reason that we pick LogNormal distribution instead of just normal distribution? core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala https://reviews.apache.org/r/31893/#comment124150 Could we add a bit of comment on how the sampling works? I guess it tries to spread the # requests into a 1000ms interval and returns the gap for the next request on every next() call? Also, is there a particular reason that we want to choose exponential distribution to spread those requests instead of a simple uniform distribution (as done in ProducerPerformance)? core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala https://reviews.apache.org/r/31893/#comment124148 Is there a particular reason that we need to overwrite isCompleted()? Typically, only tryComplete() and onComplete() need to be overwritten in a subclass of DelayedOperation. Actually, I am not sure how we complete the requests before the timeout is reached since there is no explict call for tryComplete()? - Jun Rao On March 10, 2015, 4:41 p.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31893/ --- (Updated March 10, 2015, 4:41 p.m.) Review request for kafka. Bugs: KAFKA-2013 https://issues.apache.org/jira/browse/KAFKA-2013 Repository: kafka Description --- purgatory micro benchmark Diffs - core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION Diff: https://reviews.apache.org/r/31893/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: Review Request 31967: Patch for KAFKA-1546
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31967/#review76585 --- core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/31967/#comment124158 Interesting point. I thought that it would be enough to simply check the lag value. But yes, this will cause the HW to be inconsistent. - Aditya Auradkar On March 12, 2015, 8:42 p.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31967/ --- (Updated March 12, 2015, 8:42 p.m.) Review request for kafka. Bugs: KAFKA-1546 https://issues.apache.org/jira/browse/KAFKA-1546 Repository: kafka Description --- PATCH for KAFKA-1546 Brief summary of changes: - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO - Using lag begin value in the check for ISR expand and shrink - Removed the max lag messages config since it is no longer necessary - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log. - Unit test cases to test ISR shrinkage and expansion Diffs - core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 Diff: https://reviews.apache.org/r/31967/diff/ Testing --- Thanks, Aditya Auradkar
Re: [KIP-DISCUSSION] KIP-13 Quotas
Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the messages successfully. If the rate-limiting case you're talking about had successfully committed the messages, I would say that's also a bad use of error codes. On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com wrote: We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw Errors.forCode(error).exception(); // or some other default behavior This works because we have a convention that and error is something that prevented your getting the response so the default handling case is sane and forward compatible. It is tempting to use the error code to convey information in the success case. For example we could use error codes to encode whether quotas were enforced, whether the request was served out of cache, whether the stock market is up today, or whatever. The problem is that since these are not errors as far as the client is concerned it should not throw an exception but process the response, but now we created an explicit requirement that that error be handled explicitly since it is different. I really think that this kind of information is not an error, it is just information, and if we want it in the response we should do the right thing and add a new field to the response. I think you saw the Samza bug that was literally an example of this happening and leading to an infinite retry loop. Further more I really want to emphasize that hitting your quota in the design that Adi has proposed is actually not an error condition at all. It is totally reasonable in any bootstrap situation to intentionally want to run at the limit the system imposes on you. -Jay On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote: It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya -- Thanks, Ewen
Re: [DISCUSSION] KIP-15 close(timeout) for producer
Hmm, does that mean that after close(0), the sender thread is not necessary gone? Normally, after closing an entity, we expect all internal threads associated with the entity are shut down completely. Thanks, Jun On Mon, Mar 16, 2015 at 3:18 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jun, Close(0) will set two flags in sender. Running=false and a newly added forceClose=true. It will also set accumulator.closed=true so no further producer.send() will succeed. The sender thread will finish executing all the callbacks in current batch of responses, then it will see the forceClose flag. It will just fail all the incomplete batches in the producer and exit. So close(0) is a non-blocking call and sender thread will not try to join itself in close(0). Thanks. Jiangjie (Becket) Qin On 3/16/15, 2:50 PM, Jun Rao j...@confluent.io wrote: How does close(0) work if it's called from the sender thread? If close(0) needs to wait for the sender thread to join, wouldn't this cause a deadlock? Thanks, Jun On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Thanks Guozhang. It wouldn’t be as thoroughly considered without discussing with you :) Jiangjie (Becket) Qin On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks Jiangjie, After talking to you offline on this, I have been convinced and changed my preference to blocking. The immediate shutdown approach does have some unsafeness in some cases. Guozhang On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It looks that the problem we want to solve and the purpose we want to achieve is: If user uses close() in callback, we want to let user be aware that they should use close(0) instead of close() in the callback. We have agreed that we will have an error log to inform user about this mis-usage. The options differ in the way how we can force user to take a look at that error log. There are two scenarios: 1. User does not expect the program to exit. 2. User expect the program to exit. For scenario 1), blocking will probably delay the discovery of the problem. Calling close(0) exposes the problem quicker. In this scenario producer just encounter a send failure when running normally. For scenario 2), blocking will expose the problem quick. Calling close(-1) might hide the problem. This scenario might include: a) Unit test for a send failure. b) Message sending during a close() call from a user thread. So as a summary table: Scenario 1) Scenario 2) Blocking Delay problem discovery Guaranteed problem discovery Close(-1) Immediate problem discovery Problem might be hidden Personally I prefer blocking because it seems providing more guarantees and safer. Thanks. Jiangjie (Becket) Qin On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote: HI Jiangjie, As far as I understand calling close() in the ioThread is not common, as it may only trigger when we saw some non-retriable error. Hence when user run their program it is unlikely that close() will be triggered and problem will be detected. So it seems to me that from the error detection aspect these two options seems to be the same as people will usually detect it from the producer metrics all dropping to 0. Guozhang On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It seems there are two options we can choose from when close() is called from sender thread (callback): 1. Log an error and close the producer using close(-1) 2. Log an error and block. (Throwing an exception will not work because we catch all the exception thrown from user callback. It will just lead to an error log.) My concern for the first option is that the producer will be closed even if we logged and error. I am wondering if some user would not even take a look at the log if producer is closed normally. Because from the programs behavior, everything looks good. If that is the case, the error message we logged probably will just be ignored until some day when people check the log and see it. As for the second option, because producer does not close but blocks. User will notice this the first time they run the program. They probably will look at the log to see why producer could not be closed and they will see the error log we put there. So they will get informed about this mis-usage of close() in sender thread the first time they run the code instead of some time later. Personally I prefer the second one because it is more obvious that something was wrong. Jiangjie (Becket) Qin On
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw Errors.forCode(error).exception(); // or some other default behavior This works because we have a convention that and error is something that prevented your getting the response so the default handling case is sane and forward compatible. It is tempting to use the error code to convey information in the success case. For example we could use error codes to encode whether quotas were enforced, whether the request was served out of cache, whether the stock market is up today, or whatever. The problem is that since these are not errors as far as the client is concerned it should not throw an exception but process the response, but now we created an explicit requirement that that error be handled explicitly since it is different. I really think that this kind of information is not an error, it is just information, and if we want it in the response we should do the right thing and add a new field to the response. I think you saw the Samza bug that was literally an example of this happening and leading to an infinite retry loop. Further more I really want to emphasize that hitting your quota in the design that Adi has proposed is actually not an error condition at all. It is totally reasonable in any bootstrap situation to intentionally want to run at the limit the system imposes on you. -Jay On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote: It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
Re: [KIP-DISCUSSION] KIP-13 Quotas
I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw Errors.forCode(error).exception(); // or some other default behavior This works because we have a convention that and error is something that prevented your getting the response so the default handling case is sane and forward compatible. It is tempting to use the error code to convey information in the success case. For example we could use error codes to encode whether quotas were enforced, whether the request was served out of cache, whether the stock market is up today, or whatever. The problem is that since these are not errors as far as the client is concerned it should not throw an exception but process the response, but now we created an explicit requirement that that error be handled explicitly since it is different. I really think that this kind of information is not an error, it is just information, and if we want it in the response we should do the right thing and add a new field to the response. I think you saw the Samza bug that was literally an example of this happening and leading to an infinite retry loop. Further more I really want to emphasize that hitting your quota in the design that Adi has proposed is actually not an error condition at all. It is totally reasonable in any bootstrap situation to intentionally want to run at the limit the system imposes on you. -Jay On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote: It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
Re: [KIP-DISCUSSION] KIP-13 Quotas
I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - And for implementation we can probably group the codes accordingly like HTTP status code such that we can do: boolean Error.isOK() { return code 300 code = 200; } Guozhang On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the messages successfully. If the rate-limiting case you're talking about had successfully committed the messages, I would say that's also a bad use of error codes. On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com wrote: We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw
Re: [KIP-DISCUSSION] KIP-13 Quotas
We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw Errors.forCode(error).exception(); // or some other default behavior This works because we have a convention that and error is something that prevented your getting the response so the default handling case is sane and forward compatible. It is tempting to use the error code to convey information in the success case. For example we could use error codes to encode whether quotas were enforced, whether the request was served out of cache, whether the stock market is up today, or whatever. The problem is that since these are not errors as far as the client is concerned it should not throw an exception but process the response, but now we created an explicit requirement that that error be handled explicitly since it is different. I really think that this kind of information is not an error, it is just information, and if we want it in the response we should do the right thing and add a new field to the response. I think you saw the Samza bug that was literally an example of this happening and leading to an infinite retry loop. Further more I really want to emphasize that hitting your quota in the design that Adi has proposed is actually not an error condition at all. It is totally reasonable in any bootstrap situation to intentionally want to run at the limit the system imposes on you. -Jay On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote: It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
Re: [DISCUSSION] KIP-15 close(timeout) for producer
Yeah in this sense the sender thread will not exist immediately in the close(0) call, but will only terminate after the current response batch has been processed, as will the producer instance itself. There is a reason for this though: for a clean shutdown the caller thread has to wait for the sender thread to join before closing the producer instance, but this cannot be achieve if close(0) is called by the sender thread itself (for example in KAFKA-1659, there is a proposal from Andrew Stein on using thread.interrupt and thread.stop, but if it is called by the ioThread itself the stop call will fail). Hence we came up with the flag approach to let the sender thread to close as soon as it is at the barrier of the run loop. Guozhang On Mon, Mar 16, 2015 at 9:41 PM, Jun Rao j...@confluent.io wrote: Hmm, does that mean that after close(0), the sender thread is not necessary gone? Normally, after closing an entity, we expect all internal threads associated with the entity are shut down completely. Thanks, Jun On Mon, Mar 16, 2015 at 3:18 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jun, Close(0) will set two flags in sender. Running=false and a newly added forceClose=true. It will also set accumulator.closed=true so no further producer.send() will succeed. The sender thread will finish executing all the callbacks in current batch of responses, then it will see the forceClose flag. It will just fail all the incomplete batches in the producer and exit. So close(0) is a non-blocking call and sender thread will not try to join itself in close(0). Thanks. Jiangjie (Becket) Qin On 3/16/15, 2:50 PM, Jun Rao j...@confluent.io wrote: How does close(0) work if it's called from the sender thread? If close(0) needs to wait for the sender thread to join, wouldn't this cause a deadlock? Thanks, Jun On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Thanks Guozhang. It wouldn’t be as thoroughly considered without discussing with you :) Jiangjie (Becket) Qin On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks Jiangjie, After talking to you offline on this, I have been convinced and changed my preference to blocking. The immediate shutdown approach does have some unsafeness in some cases. Guozhang On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It looks that the problem we want to solve and the purpose we want to achieve is: If user uses close() in callback, we want to let user be aware that they should use close(0) instead of close() in the callback. We have agreed that we will have an error log to inform user about this mis-usage. The options differ in the way how we can force user to take a look at that error log. There are two scenarios: 1. User does not expect the program to exit. 2. User expect the program to exit. For scenario 1), blocking will probably delay the discovery of the problem. Calling close(0) exposes the problem quicker. In this scenario producer just encounter a send failure when running normally. For scenario 2), blocking will expose the problem quick. Calling close(-1) might hide the problem. This scenario might include: a) Unit test for a send failure. b) Message sending during a close() call from a user thread. So as a summary table: Scenario 1) Scenario 2) Blocking Delay problem discovery Guaranteed problem discovery Close(-1) Immediate problem discovery Problem might be hidden Personally I prefer blocking because it seems providing more guarantees and safer. Thanks. Jiangjie (Becket) Qin On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote: HI Jiangjie, As far as I understand calling close() in the ioThread is not common, as it may only trigger when we saw some non-retriable error. Hence when user run their program it is unlikely that close() will be triggered and problem will be detected. So it seems to me that from the error detection aspect these two options seems to be the same as people will usually detect it from the producer metrics all dropping to 0. Guozhang On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It seems there are two options we can choose from when close() is called from sender thread (callback): 1. Log an error and close the producer using close(-1) 2. Log an error and block. (Throwing an exception will not work because we catch all the exception thrown from user callback. It will just lead to an error log.) My concern for the first option is that the
Re: [KIP-DISCUSSION] KIP-13 Quotas
You are right, shoe-horning status into an error field is a bad idea. While many projects use a single status field to indicate different error and non-error states, it doesn't seem like a good fit for the current Kafka implementation. Do you think that adding a status field to our protocol is feasible at this point? On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Agreed that trying to shoehorn non-error codes into the error field is a bad idea. It makes it *way* too easy to write code that looks (and should be) correct but is actually incorrect. If necessary, I think it's much better to to spend a couple of extra bytes to encode that information separately (a status or warning section of the response). An indication that throttling is occurring is something I'd expect to be indicated by a bit flag in the response rather than as an error code. Gwen - I think an error code makes sense when the request actually failed. Option B, which Jun was advocating, would have appended the messages successfully. If the rate-limiting case you're talking about had successfully committed the messages, I would say that's also a bad use of error codes. On Mon, Mar 16, 2015 at 10:16 PM, Gwen Shapira gshap...@cloudera.com wrote: We discussed an error code for rate-limiting (which I think made sense), isn't it a similar case? On Mon, Mar 16, 2015 at 10:10 PM, Jay Kreps jay.kr...@gmail.com wrote: My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw Errors.forCode(error).exception(); // or some other default behavior This works because we have a convention that and error is something that prevented your getting the response so the default handling case is sane and forward compatible. It is tempting to use the error code to convey information in the success case. For example we could use error codes to encode whether quotas were enforced, whether the request was served out of cache, whether the stock market is up today, or whatever. The problem is that since these are not errors as far as the client is concerned it should not throw an exception but process the response, but now we created an explicit requirement that that error be handled explicitly since it is different. I really think that this kind of information is not an error, it is just information, and if we want it in the response we should do the right thing and add a new field to the response. I think you saw the Samza bug that was literally an example of this happening and leading to an infinite retry loop. Further more I really want to emphasize that hitting your quota in the design that Adi has proposed is actually not an error condition at all. It is totally reasonable in any bootstrap situation to intentionally want to run at the limit the system imposes on you. -Jay On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote: It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya -- Thanks, Ewen
Re: [DISCUSS] KIP-6 - New reassignment partition logic for re-balancing
Hi, Joe, A couple of comments. 1. When creating a new topic, our replica assignment algorithm tries to achieve a few things: (a) all replicas are spread evenly across brokers; (b) the preferred replica (first replica in the assigned replica list) of all partitions are spread evenly across brokers; (c) the non-preferred replicas are spread out in such a way that if we lose a broker, the load on the failed broker is spread evenly among the remaining brokers. For example, if you look at the following replica assignment on brokers b1, b2, and b3 (with replication factor 2). Broker b1 will be the leader for partition p0 and p3. Broker b2 will be the leader for partition p1 and p4. Broker b3 will be the leader for partition p2 and p5. If b1 is gone, b2 will take over as the leader for p0 and b3 will take over as the leader for p3. This strategy makes sure that the load is even in the normal case as well as the failure case. b1 b2 b3 p0 p1 p2 p2 p0 p1 p3 p4 p5 p4 p5 p3 The current reassignment strategy actually maintains properties (a), (b) and (c) after the reassignment completes. The new algorithm takes the last few replicas from an overloaded broker and moves them to an underloaded broker. It does reduce the data movement compared with the current algorithm. It also maintains property (a). However, it doesn't seem to explicitly maintain properties (b) and (c). Data movement is a one-time cost. Maintaining balance after the data movement has long term benefit. So, it will be useful to try to maintain these properties even perhaps at the expense of a bit more data movement. Also, I think the new algorithm needs to make sure that we don't move the same replica to a new broker more than once. 2. I am not sure that we need to add a new --rebalance option. All we are changing is the assignment strategy. If that's a better strategy than before, there is no reason for anyone to use the old strategy. So, the new strategy should just be used in the --generate mode. Thanks, Jun On Wed, Mar 11, 2015 at 12:12 PM, Joe Stein joe.st...@stealth.ly wrote: Sorry for not catching up on this thread earlier, I wanted to-do this before the KIP got its updates so we could discuss if need be and not waste more time re-writing/working things that folks have issues with or such. I captured all the comments so far here with responses. So fair assignment by count (taking into account the current partition count of each broker) is very good. However, it's worth noting that all partitions are not created equal. We have actually been performing more rebalance work based on the partition size on disk, as given equal retention of all topics, the size on disk is a better indicator of the amount of traffic a partition gets, both in terms of storage and network traffic. Overall, this seems to be a better balance. Agreed though this is out of scope (imho) for what the motivations for the KIP were. The motivations section is blank (that is on me) but honestly it is because we did all the development, went back and forth with Neha on the testing and then had to back it all into the KIP process... Its a time/resource/scheduling and hope to update this soon on the KIP ... all of this is in the JIRA and code patch so its not like it is not there just not in the place maybe were folks are looking since we changed where folks should look. Initial cut at Motivations: the --generate is not used by a lot of folks because they don't trust it. Issues such as giving different results sometimes when you run it. Also other feedback from the community that it does not account for specific uses cases like adding new brokers and removing brokers (which is where that patch started https://issues.apache.org/jira/browse/KAFKA-1678 but then we changed it after review into just --rebalance https://issues.apache.org/jira/browse/KAFKA-1792). The use case for add and remove brokers is one that happens in AWS and auto scailing. There are other reasons for this too of course. The goal originally was to make what folks are already coding today (with the output of available in the project for the community. Based on the discussion in the JIRA with Neha we all agreed that making it be a faire rebalance would fulfill both uses cases. In addition to this, I think there is very much a need to have Kafka be rack-aware. That is, to be able to assure that for a given cluster, you never assign all replicas for a given partition in the same rack. This would allow us to guard against maintenances or power failures that affect a full rack of systems (or a given switch). Agreed, this though I think is out of scope for this change and something we can also do in the future. There is more that we have to figure out for rack aware specifically answering how do we know what rack the broker is on. I really really (really) worry that we keep trying to put too much into a single change the discussions go into rabbit holes and
Re: [KIP-DISCUSSION] KIP-13 Quotas
My concern is that as soon as you start encoding non-error response information into error codes the next question is what to do if two such codes apply (i.e. you have a replica down and the response is quota'd). I think I am trying to argue that error should mean why we failed your request, for which there will really only be one reason, and any other useful information we want to send back is just another field in the response. -Jay On Mon, Mar 16, 2015 at 9:51 PM, Gwen Shapira gshap...@cloudera.com wrote: I think its not too late to reserve a set of error codes (200-299?) for non-error codes. It won't be backward compatible (i.e. clients that currently do else throw will throw on non-errors), but perhaps its worthwhile. On Mon, Mar 16, 2015 at 9:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I'd really really really like to avoid that. Having just spent a bunch of time on the clients, using the error codes to encode other information about the response is super dangerous. The error handling is one of the hardest parts of the client (Guozhang chime in here). Generally the error handling looks like if(error == none) // good, process the request else if(error == KNOWN_ERROR_1) // handle known error 1 else if(error == KNOWN_ERROR_2) // handle known error 2 else throw Errors.forCode(error).exception(); // or some other default behavior This works because we have a convention that and error is something that prevented your getting the response so the default handling case is sane and forward compatible. It is tempting to use the error code to convey information in the success case. For example we could use error codes to encode whether quotas were enforced, whether the request was served out of cache, whether the stock market is up today, or whatever. The problem is that since these are not errors as far as the client is concerned it should not throw an exception but process the response, but now we created an explicit requirement that that error be handled explicitly since it is different. I really think that this kind of information is not an error, it is just information, and if we want it in the response we should do the right thing and add a new field to the response. I think you saw the Samza bug that was literally an example of this happening and leading to an infinite retry loop. Further more I really want to emphasize that hitting your quota in the design that Adi has proposed is actually not an error condition at all. It is totally reasonable in any bootstrap situation to intentionally want to run at the limit the system imposes on you. -Jay On Mon, Mar 16, 2015 at 4:27 PM, Jun Rao j...@confluent.io wrote: It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
Re: Review Request 31893: Patch for KAFKA-2013
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31893/ --- (Updated March 16, 2015, 9:13 p.m.) Review request for kafka. Bugs: KAFKA-2013 https://issues.apache.org/jira/browse/KAFKA-2013 Repository: kafka Description --- purgatory micro benchmark Diffs (updated) - core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION Diff: https://reviews.apache.org/r/31893/diff/ Testing --- Thanks, Yasuhiro Matsuda
[jira] [Commented] (KAFKA-2013) benchmark test for the purgatory
[ https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363957#comment-14363957 ] Yasuhiro Matsuda commented on KAFKA-2013: - Updated reviewboard https://reviews.apache.org/r/31893/diff/ against branch origin/trunk benchmark test for the purgatory Key: KAFKA-2013 URL: https://issues.apache.org/jira/browse/KAFKA-2013 Project: Kafka Issue Type: Test Components: purgatory Reporter: Yasuhiro Matsuda Assignee: Yasuhiro Matsuda Priority: Trivial Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch, KAFKA-2013_2015-03-16_14:13:20.patch We need a micro benchmark test for measuring the purgatory performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2013) benchmark test for the purgatory
[ https://issues.apache.org/jira/browse/KAFKA-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-2013: Attachment: KAFKA-2013_2015-03-16_14:39:07.patch benchmark test for the purgatory Key: KAFKA-2013 URL: https://issues.apache.org/jira/browse/KAFKA-2013 Project: Kafka Issue Type: Test Components: purgatory Reporter: Yasuhiro Matsuda Assignee: Yasuhiro Matsuda Priority: Trivial Attachments: KAFKA-2013.patch, KAFKA-2013_2015-03-16_13:23:38.patch, KAFKA-2013_2015-03-16_14:13:20.patch, KAFKA-2013_2015-03-16_14:39:07.patch We need a micro benchmark test for measuring the purgatory performance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [KIP-DISCUSSION] KIP-13 Quotas
It's probably useful for a client to know whether its requests are throttled or not (e.g., for monitoring and alerting). From that perspective, option B (delay the requests and return an error) seems better. Thanks, Jun On Wed, Mar 4, 2015 at 3:51 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Posted a KIP for quotas in kafka. https://cwiki.apache.org/confluence/display/KAFKA/KIP-13+-+Quotas Appreciate any feedback. Aditya
Re: [DISCUSSION] KIP-15 close(timeout) for producer
Hi Jun, Close(0) will set two flags in sender. Running=false and a newly added forceClose=true. It will also set accumulator.closed=true so no further producer.send() will succeed. The sender thread will finish executing all the callbacks in current batch of responses, then it will see the forceClose flag. It will just fail all the incomplete batches in the producer and exit. So close(0) is a non-blocking call and sender thread will not try to join itself in close(0). Thanks. Jiangjie (Becket) Qin On 3/16/15, 2:50 PM, Jun Rao j...@confluent.io wrote: How does close(0) work if it's called from the sender thread? If close(0) needs to wait for the sender thread to join, wouldn't this cause a deadlock? Thanks, Jun On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Thanks Guozhang. It wouldn’t be as thoroughly considered without discussing with you :) Jiangjie (Becket) Qin On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks Jiangjie, After talking to you offline on this, I have been convinced and changed my preference to blocking. The immediate shutdown approach does have some unsafeness in some cases. Guozhang On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It looks that the problem we want to solve and the purpose we want to achieve is: If user uses close() in callback, we want to let user be aware that they should use close(0) instead of close() in the callback. We have agreed that we will have an error log to inform user about this mis-usage. The options differ in the way how we can force user to take a look at that error log. There are two scenarios: 1. User does not expect the program to exit. 2. User expect the program to exit. For scenario 1), blocking will probably delay the discovery of the problem. Calling close(0) exposes the problem quicker. In this scenario producer just encounter a send failure when running normally. For scenario 2), blocking will expose the problem quick. Calling close(-1) might hide the problem. This scenario might include: a) Unit test for a send failure. b) Message sending during a close() call from a user thread. So as a summary table: Scenario 1) Scenario 2) Blocking Delay problem discovery Guaranteed problem discovery Close(-1) Immediate problem discovery Problem might be hidden Personally I prefer blocking because it seems providing more guarantees and safer. Thanks. Jiangjie (Becket) Qin On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote: HI Jiangjie, As far as I understand calling close() in the ioThread is not common, as it may only trigger when we saw some non-retriable error. Hence when user run their program it is unlikely that close() will be triggered and problem will be detected. So it seems to me that from the error detection aspect these two options seems to be the same as people will usually detect it from the producer metrics all dropping to 0. Guozhang On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It seems there are two options we can choose from when close() is called from sender thread (callback): 1. Log an error and close the producer using close(-1) 2. Log an error and block. (Throwing an exception will not work because we catch all the exception thrown from user callback. It will just lead to an error log.) My concern for the first option is that the producer will be closed even if we logged and error. I am wondering if some user would not even take a look at the log if producer is closed normally. Because from the programs behavior, everything looks good. If that is the case, the error message we logged probably will just be ignored until some day when people check the log and see it. As for the second option, because producer does not close but blocks. User will notice this the first time they run the program. They probably will look at the log to see why producer could not be closed and they will see the error log we put there. So they will get informed about this mis-usage of close() in sender thread the first time they run the code instead of some time later. Personally I prefer the second one because it is more obvious that something was wrong. Jiangjie (Becket) Qin On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote: Yeah I agree we should not silently change the behavior of the function with the given parameters; and I would prefer error-logging-and-shutdown over blocking when close(0) is used, since as Neha suggested blocking would also not proceed with sending any data, bu will just let users to realize the issue later than sooner. On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede
[jira] [Commented] (KAFKA-2023) git clone kafka repository requires https
[ https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364184#comment-14364184 ] Joe Stein commented on KAFKA-2023: -- works ok for me on ubuntu and redhat on two different networks {code} $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka Cloning into 'kafka'... remote: Counting objects: 24607, done. remote: Compressing objects: 100% (9212/9212), done. remote: Total 24607 (delta 14447), reused 19803 (delta 11465) Receiving objects: 100% (24607/24607), 15.62 MiB | 3.46 MiB/s, done. Resolving deltas: 100% (14447/14447), done. Checking connectivity... done. {code} git clone kafka repository requires https - Key: KAFKA-2023 URL: https://issues.apache.org/jira/browse/KAFKA-2023 Project: Kafka Issue Type: Bug Components: website Reporter: Anatoli Fomenko Priority: Minor From http://kafka.apache.org/code.html: Our code is kept in git. You can check it out like this: git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka On CentOS 6.5: {code} $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ error: RPC failed; result=22, HTTP code = 405 {code} while: {code} $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ remote: Counting objects: 24607, done. remote: Compressing objects: 100% (9212/9212), done. remote: Total 24607 (delta 14449), reused 19801 (delta 11465) Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done. Resolving deltas: 100% (14449/14449), done. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KIP-15 close(timeout) for producer
How does close(0) work if it's called from the sender thread? If close(0) needs to wait for the sender thread to join, wouldn't this cause a deadlock? Thanks, Jun On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Thanks Guozhang. It wouldn’t be as thoroughly considered without discussing with you :) Jiangjie (Becket) Qin On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks Jiangjie, After talking to you offline on this, I have been convinced and changed my preference to blocking. The immediate shutdown approach does have some unsafeness in some cases. Guozhang On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It looks that the problem we want to solve and the purpose we want to achieve is: If user uses close() in callback, we want to let user be aware that they should use close(0) instead of close() in the callback. We have agreed that we will have an error log to inform user about this mis-usage. The options differ in the way how we can force user to take a look at that error log. There are two scenarios: 1. User does not expect the program to exit. 2. User expect the program to exit. For scenario 1), blocking will probably delay the discovery of the problem. Calling close(0) exposes the problem quicker. In this scenario producer just encounter a send failure when running normally. For scenario 2), blocking will expose the problem quick. Calling close(-1) might hide the problem. This scenario might include: a) Unit test for a send failure. b) Message sending during a close() call from a user thread. So as a summary table: Scenario 1) Scenario 2) Blocking Delay problem discovery Guaranteed problem discovery Close(-1) Immediate problem discovery Problem might be hidden Personally I prefer blocking because it seems providing more guarantees and safer. Thanks. Jiangjie (Becket) Qin On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote: HI Jiangjie, As far as I understand calling close() in the ioThread is not common, as it may only trigger when we saw some non-retriable error. Hence when user run their program it is unlikely that close() will be triggered and problem will be detected. So it seems to me that from the error detection aspect these two options seems to be the same as people will usually detect it from the producer metrics all dropping to 0. Guozhang On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It seems there are two options we can choose from when close() is called from sender thread (callback): 1. Log an error and close the producer using close(-1) 2. Log an error and block. (Throwing an exception will not work because we catch all the exception thrown from user callback. It will just lead to an error log.) My concern for the first option is that the producer will be closed even if we logged and error. I am wondering if some user would not even take a look at the log if producer is closed normally. Because from the programs behavior, everything looks good. If that is the case, the error message we logged probably will just be ignored until some day when people check the log and see it. As for the second option, because producer does not close but blocks. User will notice this the first time they run the program. They probably will look at the log to see why producer could not be closed and they will see the error log we put there. So they will get informed about this mis-usage of close() in sender thread the first time they run the code instead of some time later. Personally I prefer the second one because it is more obvious that something was wrong. Jiangjie (Becket) Qin On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote: Yeah I agree we should not silently change the behavior of the function with the given parameters; and I would prefer error-logging-and-shutdown over blocking when close(0) is used, since as Neha suggested blocking would also not proceed with sending any data, bu will just let users to realize the issue later than sooner. On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede n...@confluent.io wrote: And I also agree it is better if we can make producer block when close() is called from sender thread so user will notice something went wrong. This isn't a great experience either. Why can't we just throw an exception for a behavior we know is incorrect and we'd like the user to know. Blocking as a means of doing that seems wrong and annoying. On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps jay.kr...@gmail.com wrote: Cool. I think blocking is good or
Re: Review Request 31742: Patch for KAFKA-527
On March 13, 2015, 11:43 p.m., Yasuhiro Matsuda wrote: The inheritance of MessageWriter from BufferingOutputStream is a bit confusing, since it will always use itself in the writePayload function parameter. I feel it is more clear to read the code if we just let MessageWriter contains a var of BufferingOutputStream; and instead of pass in the function logic of writing the message, we can just pass in messages and offsetCounter in the write() call which will then write the messages itself. Yasuhiro Matsuda wrote: It is true that the current code writes only through writePayload. But I wanted MessageWriter to be a subclass of OutputStream to be more generic in case we need to write additional inforation other than messages in future. Guozhang Wang wrote: As for now MessageWriter's only public function is write(key, codec) (valueWritefunction), which is used for writing a single message. Also its private functions withCrc32Prefix / withLengthPrefix is only used for message writing. So it is a bit unclear about your motivation in future extensions. Could you elaborate a bit more on that? I don't know future usages at this point. Besides, withCrc32Prefix uses internal structure of BufferingOutputStream for efficiency. Does this justify the inheritance? If we don't do so, the code will be more cluttered. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31742/#review76454 --- On March 16, 2015, 10:19 p.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31742/ --- (Updated March 16, 2015, 10:19 p.m.) Review request for kafka. Bugs: KAFKA-527 https://issues.apache.org/jira/browse/KAFKA-527 Repository: kafka Description --- less byte copies Diffs - core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 9c694719dc9b515fb3c3ae96435a87b334044272 core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/31742/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: Review Request 31958: Patch for KAFKA-1684
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31958/#review76640 --- core/src/main/scala/kafka/network/SocketServer.scala https://reviews.apache.org/r/31958/#comment124223 `{want,needs}ClientAuth` can be tricky-- check the javadoc for `SSLEngine.setWantClientAuth`... there are actually only three states: required, requested, not desired, and the last call to `{want,needs}ClientAuth` wins. So, if needs is True and wants is false, invoking the methods in this order will actually overwrite the needs setting. Recommend something like: if (sslConnectionConfig.needClientAuth) { sslEngine.setNeedClientAuth(true); } else { sslEngine.setNeedClientAuth(false); sslEngine.setWantClientAuth(sslConnectionConfig.wantClientAuth); } core/src/main/scala/kafka/network/ssl/SSLChannel.scala https://reviews.apache.org/r/31958/#comment124229 Suppose SSLEngine has written the current message (via `wrap`) to `netOutBuffer`, but that the write call in `flush`, when invoked from `handshakeWrap`, didn't write the entire buffer to the underlying socket. Would not `handshakeStatus` as reported from SSLEngine now be `NEEDS_UNWRAP`? And wouldn't that cause us to fall through to the `NEEDS_UNWRAP` case? Or do we not fall through in Scala case statements? core/src/main/scala/kafka/network/ssl/SSLChannel.scala https://reviews.apache.org/r/31958/#comment124235 Not sure about this, but do we want to update the position limit of the buffer? We flipped it after the last read, but I can't rememeber if SSLEngine.unwrap will update them if there's an incomplete packet (i.e. in the BUFFER_UNDERFLOW case). Just a few questions on some corner cases... handling all the possibilities when handshaking over NIO is really tough. - Michael Herstine On March 11, 2015, 9:36 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31958/ --- (Updated March 11, 2015, 9:36 p.m.) Review request for kafka. Bugs: KAFKA-1684 https://issues.apache.org/jira/browse/KAFKA-1684 Repository: kafka Description --- KAFKA-1684. Implement TLS/SSL authentication. Diffs - core/src/main/scala/kafka/network/Channel.scala PRE-CREATION core/src/main/scala/kafka/network/SocketServer.scala 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 core/src/main/scala/kafka/network/ssl/SSLChannel.scala PRE-CREATION core/src/main/scala/kafka/network/ssl/SSLConnectionConfig.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 core/src/main/scala/kafka/utils/SSLAuthUtils.scala PRE-CREATION core/src/test/scala/unit/kafka/network/SocketServerTest.scala 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 core/src/test/scala/unit/kafka/utils/TestSSLUtils.scala PRE-CREATION Diff: https://reviews.apache.org/r/31958/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Review Request 31958: Patch for KAFKA-1684
On March 16, 2015, 9:24 p.m., Michael Herstine wrote: core/src/main/scala/kafka/network/ssl/SSLChannel.scala, line 137 https://reviews.apache.org/r/31958/diff/1/?file=891658#file891658line137 Suppose SSLEngine has written the current message (via `wrap`) to `netOutBuffer`, but that the write call in `flush`, when invoked from `handshakeWrap`, didn't write the entire buffer to the underlying socket. Would not `handshakeStatus` as reported from SSLEngine now be `NEEDS_UNWRAP`? And wouldn't that cause us to fall through to the `NEEDS_UNWRAP` case? Or do we not fall through in Scala case statements? Thanks for the review. Ideally it should be fall through to NEEDS_UNWRAP since scala case statements doesn't allow java style follow-through I am looking at alternatives. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31958/#review76640 --- On March 11, 2015, 9:36 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31958/ --- (Updated March 11, 2015, 9:36 p.m.) Review request for kafka. Bugs: KAFKA-1684 https://issues.apache.org/jira/browse/KAFKA-1684 Repository: kafka Description --- KAFKA-1684. Implement TLS/SSL authentication. Diffs - core/src/main/scala/kafka/network/Channel.scala PRE-CREATION core/src/main/scala/kafka/network/SocketServer.scala 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 core/src/main/scala/kafka/network/ssl/SSLChannel.scala PRE-CREATION core/src/main/scala/kafka/network/ssl/SSLConnectionConfig.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 core/src/main/scala/kafka/utils/SSLAuthUtils.scala PRE-CREATION core/src/test/scala/unit/kafka/network/SocketServerTest.scala 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 core/src/test/scala/unit/kafka/utils/TestSSLUtils.scala PRE-CREATION Diff: https://reviews.apache.org/r/31958/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation
[ https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364166#comment-14364166 ] Parth Brahmbhatt commented on KAFKA-1688: - [~jkreps][~junrao] Not sure if you guys had time to review https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface but I have to make a design choice and will appreciate your input. As mentioned in the KIP, I originally thought we will just update TopicMetadata class to hold the acls and owner which should allow us to reuse TopicMetadataCache to get the acl information. However on further inspection I realized that TopicMetadataCache is just serving as the cache for Topic's partition state info and we have a completely different mechanism for caching and updating topic config entries. Currently the topic config is all about Log configuration so we have a TopicConfigManager which takes in a Log instance and keeps updating that instance's config instance as and when the topic config is updated. The topic config update notifications are sent using zk watchers by Controller. I propose to introduce a TopicConfigCache which will be updated by TopicConfigManager on any config changes. Both the log instance and authorizer will share an instance of TopicConfigCache to read the config entries from it. The acls and owner of the topic will be stored as part of topic config. An alternate solution is to modify the TopicMetadataCache so it also has topic configs. The controller will have to send updateTopicMedataCache requests on both partition changes and config changes. We will have to deprecate TopicConfigManager and the controller code that updates zk state to fire config change watchers. I am currently blocked by this so I appreciate any feedback from you guys. Add authorization interface and naive implementation Key: KAFKA-1688 URL: https://issues.apache.org/jira/browse/KAFKA-1688 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Parth Brahmbhatt Fix For: 0.8.3 Add a PermissionManager interface as described here: https://cwiki.apache.org/confluence/display/KAFKA/Security (possibly there is a better name?) Implement calls to the PermissionsManager in KafkaApis for the main requests (FetchRequest, ProduceRequest, etc). We will need to add a new error code and exception to the protocol to indicate permission denied. Add a server configuration to give the class you want to instantiate that implements that interface. That class can define its own configuration properties from the main config file. Provide a simple implementation of this interface which just takes a user and ip whitelist and permits those in either of the whitelists to do anything, and denies all others. Rather than writing an integration test for this class we can probably just use this class for the TLS and SASL authentication testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31893: Patch for KAFKA-2013
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31893/ --- (Updated March 16, 2015, 9:39 p.m.) Review request for kafka. Bugs: KAFKA-2013 https://issues.apache.org/jira/browse/KAFKA-2013 Repository: kafka Description --- purgatory micro benchmark Diffs (updated) - core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION Diff: https://reviews.apache.org/r/31893/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: Review Request 31967: Patch for KAFKA-1546
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31967/#review76641 --- lgtm overall. Minor comments below. core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala https://reviews.apache.org/r/31967/#comment124277 should be not be - can you fix/remove? core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/31967/#comment124224 Wonder why this got split. Can you re-optimize imports? core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/31967/#comment124225 Can you move the if statement to the next line core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/31967/#comment124243 Can we rename the argument to maxLagMs? core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/31967/#comment124245 Minor edit: has not read up to the LEO within the last replicaMaxLag ms, then the follower is lagging and should be removed from the ISR core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/31967/#comment124251 (Not part of your change, but could you change [%s,%d] to %s and replace topic, partitionId to TopicAndPartition(topic, partitionId)? We are trying to adopt a uniform convention everywhere in printing topic-partition and have been making these changes gradually (as they appear). core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/31967/#comment124252 same here core/src/main/scala/kafka/cluster/Replica.scala https://reviews.apache.org/r/31967/#comment124256 Can you rename this to lagBeginTimeMsUnderlying? core/src/main/scala/kafka/cluster/Replica.scala https://reviews.apache.org/r/31967/#comment124254 read up to the log end offset snapshot when the read was initiated ... core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/31967/#comment124260 Can we rename this to logEndOffsetBeforeRead? Also, can we just do with the Long (offset) instead of the entire LogOffsetMetadata? - Joel Koshy On March 16, 2015, 6:32 p.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31967/ --- (Updated March 16, 2015, 6:32 p.m.) Review request for kafka. Bugs: KAFKA-1546 https://issues.apache.org/jira/browse/KAFKA-1546 Repository: kafka Description --- PATCH for KAFKA-1546 Brief summary of changes: - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO - Using lag begin value in the check for ISR expand and shrink - Removed the max lag messages config since it is no longer necessary - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log. - Unit test cases to test ISR shrinkage and expansion Updated KAFKA-1546 patch to reflect Neha and Jun's comments Diffs - core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 Diff: https://reviews.apache.org/r/31967/diff/ Testing --- Thanks, Aditya Auradkar
[jira] [Created] (KAFKA-2023) git clone kafka repository requires https
Anatoli Fomenko created KAFKA-2023: -- Summary: git clone kafka repository requires https Key: KAFKA-2023 URL: https://issues.apache.org/jira/browse/KAFKA-2023 Project: Kafka Issue Type: Bug Components: website Reporter: Anatoli Fomenko Priority: Minor From http://kafka.apache.org/code.html: Our code is kept in git. You can check it out like this: git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka On CentOS 6.5: {code} $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ error: RPC failed; result=22, HTTP code = 405 {code} while: {code} $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ remote: Counting objects: 24607, done. remote: Compressing objects: 100% (9212/9212), done. remote: Total 24607 (delta 14449), reused 19801 (delta 11465) Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done. Resolving deltas: 100% (14449/14449), done. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2023) git clone kafka repository requires https
[ https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anatoli Fomenko updated KAFKA-2023: --- Attachment: KAFKA-2023.patch Please review the patch. Thank you. git clone kafka repository requires https - Key: KAFKA-2023 URL: https://issues.apache.org/jira/browse/KAFKA-2023 Project: Kafka Issue Type: Bug Components: website Reporter: Anatoli Fomenko Priority: Minor Attachments: KAFKA-2023.patch From http://kafka.apache.org/code.html: Our code is kept in git. You can check it out like this: git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka On CentOS 6.5: {code} $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ error: RPC failed; result=22, HTTP code = 405 {code} while: {code} $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka Initialized empty Git repository in /home/anatoli/git/kafka/.git/ remote: Counting objects: 24607, done. remote: Compressing objects: 100% (9212/9212), done. remote: Total 24607 (delta 14449), reused 19801 (delta 11465) Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done. Resolving deltas: 100% (14449/14449), done. {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31958: Patch for KAFKA-1684
On March 16, 2015, 9:24 p.m., Michael Herstine wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 318 https://reviews.apache.org/r/31958/diff/1/?file=891657#file891657line318 `{want,needs}ClientAuth` can be tricky-- check the javadoc for `SSLEngine.setWantClientAuth`... there are actually only three states: required, requested, not desired, and the last call to `{want,needs}ClientAuth` wins. So, if needs is True and wants is false, invoking the methods in this order will actually overwrite the needs setting. Recommend something like: if (sslConnectionConfig.needClientAuth) { sslEngine.setNeedClientAuth(true); } else { sslEngine.setNeedClientAuth(false); sslEngine.setWantClientAuth(sslConnectionConfig.wantClientAuth); } Thanks for pointing it out I'll fix that. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31958/#review76640 --- On March 11, 2015, 9:36 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31958/ --- (Updated March 11, 2015, 9:36 p.m.) Review request for kafka. Bugs: KAFKA-1684 https://issues.apache.org/jira/browse/KAFKA-1684 Repository: kafka Description --- KAFKA-1684. Implement TLS/SSL authentication. Diffs - core/src/main/scala/kafka/network/Channel.scala PRE-CREATION core/src/main/scala/kafka/network/SocketServer.scala 76ce41aed6e04ac5ba88395c4d5008aca17f9a73 core/src/main/scala/kafka/network/ssl/SSLChannel.scala PRE-CREATION core/src/main/scala/kafka/network/ssl/SSLConnectionConfig.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 core/src/main/scala/kafka/server/KafkaServer.scala dddef938fabae157ed8644536eb1a2f329fb42b7 core/src/main/scala/kafka/utils/SSLAuthUtils.scala PRE-CREATION core/src/test/scala/unit/kafka/network/SocketServerTest.scala 0af23abf146d99e3d6cf31e5d6b95a9e63318ddb core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 core/src/test/scala/unit/kafka/utils/TestSSLUtils.scala PRE-CREATION Diff: https://reviews.apache.org/r/31958/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Review Request 31893: Patch for KAFKA-2013
On March 16, 2015, 5:17 p.m., Jun Rao wrote: core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala, line 193 https://reviews.apache.org/r/31893/diff/1/?file=890190#file890190line193 Is there a particular reason that we need to overwrite isCompleted()? Typically, only tryComplete() and onComplete() need to be overwritten in a subclass of DelayedOperation. Actually, I am not sure how we complete the requests before the timeout is reached since there is no explict call for tryComplete()? Yasuhiro Matsuda wrote: isCompleted checks if the current time has passed the schedule completion time rather than if forceComplete has been called. It makes isCompleted always accurate. Purgatory checks watcher lists every so often and calls isCompleted. Calling forceComplete from isCompeleted ensures that a completed request is removed from the timing wheels in the new implementation. In terms of timing, this is not very accurate because completed requests may stay longer then they should be. This doesn't affect the old implementaion at all, but it may impose some overheads on the new implementaion. Still, the new one outperforms the old one. It is ideal if we can call call forceComplete on scheduled completion time. It requires another timer (DelayQueue or Timer) for that. I think it is too much overhead to measure purgatory performace. And also it is hard to guarantee such a timer works accurately in this test setting. It looks the watcher list check happens frequent enough in both new and old implementations. The average delay to acutal forceComplete call from the completion time is several tens of millisecs (low request rate) to sub-millisecs (high request rate). - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31893/#review76566 --- On March 16, 2015, 9:39 p.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31893/ --- (Updated March 16, 2015, 9:39 p.m.) Review request for kafka. Bugs: KAFKA-2013 https://issues.apache.org/jira/browse/KAFKA-2013 Repository: kafka Description --- purgatory micro benchmark Diffs - core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala PRE-CREATION Diff: https://reviews.apache.org/r/31893/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: [DISCUSSION] KIP-15 close(timeout) for producer
Thanks Guozhang. It wouldn’t be as thoroughly considered without discussing with you :) Jiangjie (Becket) Qin On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks Jiangjie, After talking to you offline on this, I have been convinced and changed my preference to blocking. The immediate shutdown approach does have some unsafeness in some cases. Guozhang On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It looks that the problem we want to solve and the purpose we want to achieve is: If user uses close() in callback, we want to let user be aware that they should use close(0) instead of close() in the callback. We have agreed that we will have an error log to inform user about this mis-usage. The options differ in the way how we can force user to take a look at that error log. There are two scenarios: 1. User does not expect the program to exit. 2. User expect the program to exit. For scenario 1), blocking will probably delay the discovery of the problem. Calling close(0) exposes the problem quicker. In this scenario producer just encounter a send failure when running normally. For scenario 2), blocking will expose the problem quick. Calling close(-1) might hide the problem. This scenario might include: a) Unit test for a send failure. b) Message sending during a close() call from a user thread. So as a summary table: Scenario 1) Scenario 2) Blocking Delay problem discovery Guaranteed problem discovery Close(-1) Immediate problem discovery Problem might be hidden Personally I prefer blocking because it seems providing more guarantees and safer. Thanks. Jiangjie (Becket) Qin On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote: HI Jiangjie, As far as I understand calling close() in the ioThread is not common, as it may only trigger when we saw some non-retriable error. Hence when user run their program it is unlikely that close() will be triggered and problem will be detected. So it seems to me that from the error detection aspect these two options seems to be the same as people will usually detect it from the producer metrics all dropping to 0. Guozhang On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It seems there are two options we can choose from when close() is called from sender thread (callback): 1. Log an error and close the producer using close(-1) 2. Log an error and block. (Throwing an exception will not work because we catch all the exception thrown from user callback. It will just lead to an error log.) My concern for the first option is that the producer will be closed even if we logged and error. I am wondering if some user would not even take a look at the log if producer is closed normally. Because from the programs behavior, everything looks good. If that is the case, the error message we logged probably will just be ignored until some day when people check the log and see it. As for the second option, because producer does not close but blocks. User will notice this the first time they run the program. They probably will look at the log to see why producer could not be closed and they will see the error log we put there. So they will get informed about this mis-usage of close() in sender thread the first time they run the code instead of some time later. Personally I prefer the second one because it is more obvious that something was wrong. Jiangjie (Becket) Qin On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote: Yeah I agree we should not silently change the behavior of the function with the given parameters; and I would prefer error-logging-and-shutdown over blocking when close(0) is used, since as Neha suggested blocking would also not proceed with sending any data, bu will just let users to realize the issue later than sooner. On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede n...@confluent.io wrote: And I also agree it is better if we can make producer block when close() is called from sender thread so user will notice something went wrong. This isn't a great experience either. Why can't we just throw an exception for a behavior we know is incorrect and we'd like the user to know. Blocking as a means of doing that seems wrong and annoying. On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps jay.kr...@gmail.com wrote: Cool. I think blocking is good or alternately throwing an exception directly from close(). Basically I would just worry about subtly doing something slightly different from what the user asked for as it will be hard to notice that behavior difference. -Jay On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, I have modified the KIP
[jira] [Commented] (KAFKA-527) Compression support does numerous byte copies
[ https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364085#comment-14364085 ] Yasuhiro Matsuda commented on KAFKA-527: Updated reviewboard https://reviews.apache.org/r/31742/diff/ against branch origin/trunk Compression support does numerous byte copies - Key: KAFKA-527 URL: https://issues.apache.org/jira/browse/KAFKA-527 Project: Kafka Issue Type: Bug Components: compression Reporter: Jay Kreps Assignee: Yasuhiro Matsuda Priority: Critical Attachments: KAFKA-527.message-copy.history, KAFKA-527.patch, KAFKA-527_2015-03-16_15:19:29.patch, java.hprof.no-compression.txt, java.hprof.snappy.text The data path for compressing or decompressing messages is extremely inefficient. We do something like 7 (?) complete copies of the data, often for simple things like adding a 4 byte size to the front. I am not sure how this went by unnoticed. This is likely the root cause of the performance issues we saw in doing bulk recompression of data in mirror maker. The mismatch between the InputStream and OutputStream interfaces and the Message/MessageSet interfaces which are based on byte buffers is the cause of many of these. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-527) Compression support does numerous byte copies
[ https://issues.apache.org/jira/browse/KAFKA-527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yasuhiro Matsuda updated KAFKA-527: --- Attachment: KAFKA-527_2015-03-16_15:19:29.patch Compression support does numerous byte copies - Key: KAFKA-527 URL: https://issues.apache.org/jira/browse/KAFKA-527 Project: Kafka Issue Type: Bug Components: compression Reporter: Jay Kreps Assignee: Yasuhiro Matsuda Priority: Critical Attachments: KAFKA-527.message-copy.history, KAFKA-527.patch, KAFKA-527_2015-03-16_15:19:29.patch, java.hprof.no-compression.txt, java.hprof.snappy.text The data path for compressing or decompressing messages is extremely inefficient. We do something like 7 (?) complete copies of the data, often for simple things like adding a 4 byte size to the front. I am not sure how this went by unnoticed. This is likely the root cause of the performance issues we saw in doing bulk recompression of data in mirror maker. The mismatch between the InputStream and OutputStream interfaces and the Message/MessageSet interfaces which are based on byte buffers is the cause of many of these. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1546) Automate replica lag tuning
[ https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya A Auradkar updated KAFKA-1546: - Attachment: KAFKA-1546_2015-03-16_11:31:39.patch Automate replica lag tuning --- Key: KAFKA-1546 URL: https://issues.apache.org/jira/browse/KAFKA-1546 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.0, 0.8.1, 0.8.1.1 Reporter: Neha Narkhede Assignee: Aditya Auradkar Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1546.patch, KAFKA-1546_2015-03-11_18:48:09.patch, KAFKA-1546_2015-03-12_13:42:01.patch, KAFKA-1546_2015-03-16_11:31:39.patch Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1546) Automate replica lag tuning
[ https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363672#comment-14363672 ] Aditya A Auradkar commented on KAFKA-1546: -- Updated reviewboard https://reviews.apache.org/r/31967/diff/ against branch origin/trunk Automate replica lag tuning --- Key: KAFKA-1546 URL: https://issues.apache.org/jira/browse/KAFKA-1546 Project: Kafka Issue Type: Improvement Components: replication Affects Versions: 0.8.0, 0.8.1, 0.8.1.1 Reporter: Neha Narkhede Assignee: Aditya Auradkar Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1546.patch, KAFKA-1546_2015-03-11_18:48:09.patch, KAFKA-1546_2015-03-12_13:42:01.patch, KAFKA-1546_2015-03-16_11:31:39.patch Currently, there is no good way to tune the replica lag configs to automatically account for high and low volume topics on the same cluster. For the low-volume topic it will take a very long time to detect a lagging replica, and for the high-volume topic it will have false-positives. One approach to making this easier would be to have the configuration be something like replica.lag.max.ms and translate this into a number of messages dynamically based on the throughput of the partition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31967: Patch for KAFKA-1546
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31967/ --- (Updated March 16, 2015, 6:31 p.m.) Review request for kafka. Bugs: KAFKA-1546 https://issues.apache.org/jira/browse/KAFKA-1546 Repository: kafka Description (updated) --- PATCH for KAFKA-1546 PATCH for KAFKA-1546 Brief summary of changes: - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO - Using lag begin value in the check for ISR expand and shrink - Removed the max lag messages config since it is no longer necessary - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log. - Unit test cases to test ISR shrinkage and expansion Updated KAFKA-1546 patch to reflect Neha and Jun's comments Diffs (updated) - core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 Diff: https://reviews.apache.org/r/31967/diff/ Testing --- Thanks, Aditya Auradkar
Re: Review Request 31742: Patch for KAFKA-527
On March 13, 2015, 11:43 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/message/MessageWriter.scala, line 29 https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line29 Add a check that codec should not be NoCompression. Why the codec should not be NoCompression? The code works with NoCompression, too. On March 13, 2015, 11:43 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/message/MessageWriter.scala, line 97 https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line97 Could we use comments in /** * */ format? Is this comment style prohibitted? This class is for internal use with fairly localized usage. On March 13, 2015, 11:43 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/message/MessageWriter.scala, line 117 https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line117 We can just pass in the Byte here. This is a contract of OutputStream. On March 13, 2015, 11:43 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/message/MessageWriter.scala, line 135 https://reviews.apache.org/r/31742/diff/1/?file=884487#file884487line135 Better group the private functions together after the public functions. Well, I don't think it is particulary better way to organize code, but if you insist I can change it. Kafka code base doesn't seem to follow that convention... On March 13, 2015, 11:43 p.m., Yasuhiro Matsuda wrote: The inheritance of MessageWriter from BufferingOutputStream is a bit confusing, since it will always use itself in the writePayload function parameter. I feel it is more clear to read the code if we just let MessageWriter contains a var of BufferingOutputStream; and instead of pass in the function logic of writing the message, we can just pass in messages and offsetCounter in the write() call which will then write the messages itself. It is true that the current code writes only through writePayload. But I wanted MessageWriter to be a subclass of OutputStream to be more generic in case we need to write additional inforation other than messages in future. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31742/#review76454 --- On March 4, 2015, 7:43 p.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31742/ --- (Updated March 4, 2015, 7:43 p.m.) Review request for kafka. Bugs: KAFKA-527 https://issues.apache.org/jira/browse/KAFKA-527 Repository: kafka Description --- less byte copies Diffs - core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 9c694719dc9b515fb3c3ae96435a87b334044272 core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/31742/diff/ Testing --- Thanks, Yasuhiro Matsuda
[jira] [Commented] (KAFKA-2020) I expect ReplicaNotAvailableException to have proper Javadocs
[ https://issues.apache.org/jira/browse/KAFKA-2020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14363678#comment-14363678 ] Jun Rao commented on KAFKA-2020: The following is the protocol for TopicMetadataResponse. Currently, we do the following: 1. If leader is not available, we set the partition level error code to LeaderNotAvailable. 2. If a non-leader replica is not available, we take that replica out of the the assigned replica list and isr in the response. As an indication for doing that, we set the partition level error code to ReplicaNotAvailable. This has a few problems. First, ReplicaNotAvailable probably shouldn't be an error, at least for the normal producer/consumer clients that just want to find out the leader. Second, it can happen that both the leader and another replica are not available at the same time. There is no error code to indicate both. Third, even if a replica is not available, it's still useful to return its replica id since some clients (e.g. admin tool) may still make use of it. One way to address this issue is to always return the replica id for leader, assigned replicas, and isr regardless of whether the corresponding broker is live or not. Since we also return the list of live brokers, the client can figure out whether a leader or a replica is live or not and act accordingly. This way, we don't need to set the partition level error code when the leader or a replica is not available. This doesn't change the wire protocol, but does change the semantics. So, a new version of the protocol is needed. Since we are debating evolving TopicMetadataRequest in KIP-4. We can potentially piggyback on that. {code} MetadataResponse = [Broker][TopicMetadata] Broker = NodeId Host Port (any number of brokers may be returned) NodeId = int32 Host = string Port = int32 TopicMetadata = TopicErrorCode TopicName [PartitionMetadata] TopicErrorCode = int16 PartitionMetadata = PartitionErrorCode PartitionId Leader Replicas Isr PartitionErrorCode = int16 PartitionId = int32 Leader = int32 Replicas = [int32] Isr = [int32] {code} I expect ReplicaNotAvailableException to have proper Javadocs - Key: KAFKA-2020 URL: https://issues.apache.org/jira/browse/KAFKA-2020 Project: Kafka Issue Type: Bug Components: consumer Reporter: Chris Riccomini Assignee: Neha Narkhede It looks like ReplicaNotAvailableException was copy and pasted from LeaderNotAvailable exception. The Javadocs were never changed. This means that users think that ReplicaNotAvailableException signifies leaders are not available. This is very different from, I can ignore this exception, which is what the Kafka protocol docs say to do with ReplicaNotAvailableException. Related: what's the point of ReplicaNotAvailableException if it's supposed to be ignored? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31967: Patch for KAFKA-1546
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31967/ --- (Updated March 16, 2015, 6:32 p.m.) Review request for kafka. Bugs: KAFKA-1546 https://issues.apache.org/jira/browse/KAFKA-1546 Repository: kafka Description (updated) --- PATCH for KAFKA-1546 Brief summary of changes: - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO - Using lag begin value in the check for ISR expand and shrink - Removed the max lag messages config since it is no longer necessary - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log. - Unit test cases to test ISR shrinkage and expansion Updated KAFKA-1546 patch to reflect Neha and Jun's comments Diffs - core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 Diff: https://reviews.apache.org/r/31967/diff/ Testing --- Thanks, Aditya Auradkar
Re: [DISCUSSION] KIP-15 close(timeout) for producer
It looks that the problem we want to solve and the purpose we want to achieve is: If user uses close() in callback, we want to let user be aware that they should use close(0) instead of close() in the callback. We have agreed that we will have an error log to inform user about this mis-usage. The options differ in the way how we can force user to take a look at that error log. There are two scenarios: 1. User does not expect the program to exit. 2. User expect the program to exit. For scenario 1), blocking will probably delay the discovery of the problem. Calling close(0) exposes the problem quicker. In this scenario producer just encounter a send failure when running normally. For scenario 2), blocking will expose the problem quick. Calling close(-1) might hide the problem. This scenario might include: a) Unit test for a send failure. b) Message sending during a close() call from a user thread. So as a summary table: Scenario 1) Scenario 2) Blocking Delay problem discovery Guaranteed problem discovery Close(-1) Immediate problem discovery Problem might be hidden Personally I prefer blocking because it seems providing more guarantees and safer. Thanks. Jiangjie (Becket) Qin On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote: HI Jiangjie, As far as I understand calling close() in the ioThread is not common, as it may only trigger when we saw some non-retriable error. Hence when user run their program it is unlikely that close() will be triggered and problem will be detected. So it seems to me that from the error detection aspect these two options seems to be the same as people will usually detect it from the producer metrics all dropping to 0. Guozhang On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It seems there are two options we can choose from when close() is called from sender thread (callback): 1. Log an error and close the producer using close(-1) 2. Log an error and block. (Throwing an exception will not work because we catch all the exception thrown from user callback. It will just lead to an error log.) My concern for the first option is that the producer will be closed even if we logged and error. I am wondering if some user would not even take a look at the log if producer is closed normally. Because from the programs behavior, everything looks good. If that is the case, the error message we logged probably will just be ignored until some day when people check the log and see it. As for the second option, because producer does not close but blocks. User will notice this the first time they run the program. They probably will look at the log to see why producer could not be closed and they will see the error log we put there. So they will get informed about this mis-usage of close() in sender thread the first time they run the code instead of some time later. Personally I prefer the second one because it is more obvious that something was wrong. Jiangjie (Becket) Qin On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote: Yeah I agree we should not silently change the behavior of the function with the given parameters; and I would prefer error-logging-and-shutdown over blocking when close(0) is used, since as Neha suggested blocking would also not proceed with sending any data, bu will just let users to realize the issue later than sooner. On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede n...@confluent.io wrote: And I also agree it is better if we can make producer block when close() is called from sender thread so user will notice something went wrong. This isn't a great experience either. Why can't we just throw an exception for a behavior we know is incorrect and we'd like the user to know. Blocking as a means of doing that seems wrong and annoying. On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps jay.kr...@gmail.com wrote: Cool. I think blocking is good or alternately throwing an exception directly from close(). Basically I would just worry about subtly doing something slightly different from what the user asked for as it will be hard to notice that behavior difference. -Jay On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, I have modified the KIP as you suggested. I thinks as long as we have consistent define for timeout across Kafka interface, there would be no problem. And I also agree it is better if we can make producer block when close() is called from sender thread so user will notice something went wrong. Thanks. Jiangjie (Becket) Qin On 3/14/15, 11:37 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jiangjie, I think this is going to be very confusing that close(0) waits indefinitely and close(-1) waits for 0. I understand this
Re: Review Request 32061: WIP for KAFKA-2015 plus some minor fixes in new consumer
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32061/#review76630 --- Ship it! Ship It! - Onur Karaman On March 13, 2015, 10:26 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32061/ --- (Updated March 13, 2015, 10:26 p.m.) Review request for kafka. Bugs: KAFKA-2015 https://issues.apache.org/jira/browse/KAFKA-2015 Repository: kafka Description --- NOTE: without the rebalance implementation a single consumer will try to subscribe to all partitions of the given topic. Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java 436f9b2a843bc8c44d17403f5880b6736a5d56a8 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 8b71fbad5c404d3f23137e153d6376de9f82b823 config/tools-log4j.properties 52f07c96019b4083fc78f62cfb0a81080327e847 core/src/main/scala/kafka/consumer/BaseConsumer.scala PRE-CREATION core/src/main/scala/kafka/tools/ConsoleConsumer.scala 910691e88ccc66a1542d0ea85bb2f732861d805e core/src/main/scala/kafka/tools/ConsoleProducer.scala 00265f9f4a4b6c6a9aa023e5be5faf297f77bf31 Diff: https://reviews.apache.org/r/32061/diff/ Testing --- Thanks, Guozhang Wang
Re: [DISCUSSION] KIP-15 close(timeout) for producer
Thanks Jiangjie, After talking to you offline on this, I have been convinced and changed my preference to blocking. The immediate shutdown approach does have some unsafeness in some cases. Guozhang On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It looks that the problem we want to solve and the purpose we want to achieve is: If user uses close() in callback, we want to let user be aware that they should use close(0) instead of close() in the callback. We have agreed that we will have an error log to inform user about this mis-usage. The options differ in the way how we can force user to take a look at that error log. There are two scenarios: 1. User does not expect the program to exit. 2. User expect the program to exit. For scenario 1), blocking will probably delay the discovery of the problem. Calling close(0) exposes the problem quicker. In this scenario producer just encounter a send failure when running normally. For scenario 2), blocking will expose the problem quick. Calling close(-1) might hide the problem. This scenario might include: a) Unit test for a send failure. b) Message sending during a close() call from a user thread. So as a summary table: Scenario 1) Scenario 2) Blocking Delay problem discovery Guaranteed problem discovery Close(-1) Immediate problem discovery Problem might be hidden Personally I prefer blocking because it seems providing more guarantees and safer. Thanks. Jiangjie (Becket) Qin On 3/16/15, 10:11 AM, Guozhang Wang wangg...@gmail.com wrote: HI Jiangjie, As far as I understand calling close() in the ioThread is not common, as it may only trigger when we saw some non-retriable error. Hence when user run their program it is unlikely that close() will be triggered and problem will be detected. So it seems to me that from the error detection aspect these two options seems to be the same as people will usually detect it from the producer metrics all dropping to 0. Guozhang On Mon, Mar 16, 2015 at 9:52 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It seems there are two options we can choose from when close() is called from sender thread (callback): 1. Log an error and close the producer using close(-1) 2. Log an error and block. (Throwing an exception will not work because we catch all the exception thrown from user callback. It will just lead to an error log.) My concern for the first option is that the producer will be closed even if we logged and error. I am wondering if some user would not even take a look at the log if producer is closed normally. Because from the programs behavior, everything looks good. If that is the case, the error message we logged probably will just be ignored until some day when people check the log and see it. As for the second option, because producer does not close but blocks. User will notice this the first time they run the program. They probably will look at the log to see why producer could not be closed and they will see the error log we put there. So they will get informed about this mis-usage of close() in sender thread the first time they run the code instead of some time later. Personally I prefer the second one because it is more obvious that something was wrong. Jiangjie (Becket) Qin On 3/15/15, 4:27 PM, Guozhang Wang wangg...@gmail.com wrote: Yeah I agree we should not silently change the behavior of the function with the given parameters; and I would prefer error-logging-and-shutdown over blocking when close(0) is used, since as Neha suggested blocking would also not proceed with sending any data, bu will just let users to realize the issue later than sooner. On Sun, Mar 15, 2015 at 3:25 PM, Neha Narkhede n...@confluent.io wrote: And I also agree it is better if we can make producer block when close() is called from sender thread so user will notice something went wrong. This isn't a great experience either. Why can't we just throw an exception for a behavior we know is incorrect and we'd like the user to know. Blocking as a means of doing that seems wrong and annoying. On Sun, Mar 15, 2015 at 11:56 AM, Jay Kreps jay.kr...@gmail.com wrote: Cool. I think blocking is good or alternately throwing an exception directly from close(). Basically I would just worry about subtly doing something slightly different from what the user asked for as it will be hard to notice that behavior difference. -Jay On Sat, Mar 14, 2015 at 5:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, I have modified the KIP as you suggested. I thinks as long as we have consistent define for timeout across Kafka interface, there would be no problem. And I also agree it is better if
[jira] [Created] (KAFKA-2022) simpleconsumer.fetch(req) throws a java.nio.channels.ClosedChannelException: null exception when the original leader fails instead of being trapped in the fetchResponse a
Muqeet Mohammed Ali created KAFKA-2022: -- Summary: simpleconsumer.fetch(req) throws a java.nio.channels.ClosedChannelException: null exception when the original leader fails instead of being trapped in the fetchResponse api while consuming messages Key: KAFKA-2022 URL: https://issues.apache.org/jira/browse/KAFKA-2022 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.2.1 Environment: 3 linux nodes with both zookeepr brokers running under respective users on each.. Reporter: Muqeet Mohammed Ali Assignee: Neha Narkhede simpleconsumer.fetch(req) throws a java.nio.channels.ClosedChannelException: null exception when the original leader fails, instead of being trapped in the fetchResponse api while consuming messages. My understanding was that any fetch failures can be found via fetchResponse.hasError() call and then be handled to fetch new leader in this case. Below is the relevant code snippet from the simple consumer with comments marking the line causing exception..can you please comment on this? if (simpleconsumer == null) { simpleconsumer = new SimpleConsumer(leaderAddress.getHostName(), leaderAddress.getPort(), consumerTimeout, consumerBufferSize, consumerId); } FetchRequest req = new FetchRequestBuilder().clientId(getConsumerId()) .addFetch(topic, partition, offsetManager.getTempOffset(), consumerBufferSize) // Note: the fetchSize might need to be increased // if large batches are written to Kafka .build(); // exception is throw at the below line FetchResponse fetchResponse = simpleconsumer.fetch(req); if (fetchResponse.hasError()) { numErrors++; etc... -- This message was sent by Atlassian JIRA (v6.3.4#6332)