Re: [DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.
+1 on trying to reuse the NetworkClient code. I think Jun's approach could work, but I'm wondering if refactoring a bit could get better separation of concerns without a somewhat awkward nop implementation of Metadata. I'm not sure what combination of delegation or subclassing makes sense yet, but here's another approach that I think could work: * Get rid of metadata stuff from NetworkClient. Add a subclass that also manages all the metadata. (Since it's used for both producer and consumer, the obvious name that I first jumped to is ClientNetworkClient, but somehow I think we can come up with something less confusing.) * leastLoadedNode is the only caller of metadata.fetch() in that class, maybeUpdateMetadata is the only caller of leastLoadedNode, maybeUpdateMetadata is only called in poll when a combination of metadata related timeouts end up being 0. These can be safely refactored into the subclass with one override of poll(). Same with metadataFetchInProgress assuming the rest of the changes below. * Some of the default implementations (e.g. handleMetadataResponse) can be left nops in NetworkClient and moved to the subclass. * Others can be overridden to call the super method then take the additional action necessary (e.g., on disconnect, move the metadata update request to the subclass). * Making the timeout handling in poll() work for both NetworkClient and the new base class might be the messiest part and might require breaking down the implementation of poll into multiple methods. * isReady uses metadataFetchInProgress and gets a timeout from the Metadata class. We can just override this method as well, though I feel like there's probably a cleaner solution. -Ewen On Tue, May 5, 2015 at 4:54 PM, Jun Rao j...@confluent.io wrote: Hi, Jiangjie, Thanks for taking on this. I was thinking that one way to decouple the dependency on Metadata in NetworkClient is the following. 1. Make Metadata an interface. 2. Rename current Metadata class to sth like KafkaMetadata that implements the Metadata interface. 3. Have a new NoOpMetadata class that implements the Metadata interface. This class 3.1 does nothing for any write method 3.2 returns max long for any method that asks for a timestamp 3.3. returns an empty Cluster for fetch(). Then we can leave NetworkClient unchanged and just pass in a NoOpMetadata when using NetworkClient in the controller. The consumer/producer client will be using KafkaMetadata. As for replica fetchers, it may be possible to use KafkaConsumer. However, we don't need the metadata and the offset management. So, perhaps it's easier to just use NetworkClient. Also, currently, there is one replica fetcher thread per source broker. By using NetworkClient, we can change that to using a single thread for all source brokers. This is probably a bigger change. So, maybe we can do it later. Jun I think we probably need to replace replica fetcher with NetworkClient as well. Replica fetcher gets leader from the controller and therefore doesn't On Tue, May 5, 2015 at 1:37 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I am trying to see if we can reuse the NetworkClient class to be used in controller to broker communication. (Also, we can probably use KafkaConsumer which is already using NetworkClient in replica fetchers). Currently NetworkClient does the following things in addition to sending requests. 1. Connection state management. 2. Flow control (inflight requests) 3. Metadata refresh In controller we need (1) and (2) but not (3). NetworkClient is tightly coupled with metadata now and this is the major blocker of reusing NetworkClient in controller. For controller, we don’t need NetworkClient to manage any metadata because the controller has listeners to monitor the cluster state and has all the information about topic metadata. I am thinking we can add a disable metadata refresh flag to NetworkClient or set metadata refresh interval to be Long.MAX_VALUE, so the metadata will be managed outside NetworkClient. This needs minimal change to allow NetworkClient to be reused, but the ugly part is NetworkClient still has the entire Metadata while it actually only needs a NodeList. Want to see what do people think about this. Thanks. Jiangjie (Becket) Qin -- Thanks, Ewen
[jira] [Created] (KAFKA-2173) Kafka died after throw more error
Truyet Nguyen created KAFKA-2173: Summary: Kafka died after throw more error Key: KAFKA-2173 URL: https://issues.apache.org/jira/browse/KAFKA-2173 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.1 Environment: VPS Server CentOs 6.6 4G Ram Reporter: Truyet Nguyen Kafka is died after server.log throw more error: [2015-05-05 16:08:34,616] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470) at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) at kafka.network.Processor.write(SocketServer.scala:472) at kafka.network.Processor.run(SocketServer.scala:342) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2170) 10 LogTest cases failed for file.renameTo failed under windows
[ https://issues.apache.org/jira/browse/KAFKA-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14530071#comment-14530071 ] Honghai Chen commented on KAFKA-2170: - [~junrao] this failure not related with OffsetIndex.resize() . This issue is related with file.rename() failure on windows if the file is opened. For the offsetIndex.resize() issue, after add log, I see the unmap has been triggered, but still get that exception. Add below case to LogSegmentTest will get exception, java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply$mcV$sp(OffsetIndex.scal a:299) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:284) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:284) at kafka.log.OffsetIndex.maybeLock(OffsetIndex.scala:406) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:284) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI ndex.scala:273) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc ala:271) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc ala:271) at kafka.log.OffsetIndex.maybeLock(OffsetIndex.scala:406) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:201) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash2(LogSegmentT est.scala:304) def testCreateWithInitFileSizeCrash() { val tempDir = TestUtils.tempDir() val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime) val ms = messages(50, hello, there) seg.append(50, ms) val ms2 = messages(60, alpha, beta) seg.append(60, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) assertEquals(ms2.toList, read.messageSet.toList) val oldSize = seg.log.sizeInBytes() val oldPosition = seg.log.channel.position val oldFileSize = seg.log.file.length seg.flush() seg.log.channel.close() seg.index.close() val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, true) segReopen.recover(64*1024) val size = segReopen.log.sizeInBytes() val position = segReopen.log.channel.position val fileSize = segReopen.log.file.length assertEquals(oldPosition, position) assertEquals(oldSize, size) assertEquals(size, fileSize) } 10 LogTest cases failed for file.renameTo failed under windows --- Key: KAFKA-2170 URL: https://issues.apache.org/jira/browse/KAFKA-2170 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.9.0 Environment: Windows Reporter: Honghai Chen Assignee: Jay Kreps get latest code from trunk, then run test gradlew -i core:test --tests kafka.log.LogTest Got 10 cases failed for same reason: kafka.common.KafkaStorageException: Failed to change the log file suffix from to .deleted for log segment 0 at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:259) at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:756) at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:747) at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514) at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.log.Log.deleteOldSegments(Log.scala:514) at kafka.log.LogTest.testAsyncDelete(LogTest.scala:633) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:44) at
[jira] [Updated] (KAFKA-2174) Wrong TopicMetadata deserialization
[ https://issues.apache.org/jira/browse/KAFKA-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2174: - Attachment: KAFKA-2174.patch Wrong TopicMetadata deserialization --- Key: KAFKA-2174 URL: https://issues.apache.org/jira/browse/KAFKA-2174 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2174.patch TopicMetadata.readFrom assumes that ByteBuffer always contains the full set of partitions but it is not true. On incomplete metadata we will get java.lang.ArrayIndexOutOfBoundsException: {code} java.lang.ArrayIndexOutOfBoundsException: 47 at kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31) {code} We sometimes get this exceptions on any broker restart (kill -TERM, controlled.shutdown.enable=false). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2174) Wrong TopicMetadata deserialization
[ https://issues.apache.org/jira/browse/KAFKA-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexey Ozeritskiy updated KAFKA-2174: - Status: Patch Available (was: Open) Wrong TopicMetadata deserialization --- Key: KAFKA-2174 URL: https://issues.apache.org/jira/browse/KAFKA-2174 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy Attachments: KAFKA-2174.patch TopicMetadata.readFrom assumes that ByteBuffer always contains the full set of partitions but it is not true. On incomplete metadata we will get java.lang.ArrayIndexOutOfBoundsException: {code} java.lang.ArrayIndexOutOfBoundsException: 47 at kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31) {code} We sometimes get this exceptions on any broker restart (kill -TERM, controlled.shutdown.enable=false). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer
Thanks Jay. I removed partitioner.metadata from KIP. I’ll send an updated patch. -- Harsha Sent with Airmail On May 5, 2015 at 6:31:47 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) wrote: Thanks for the comments everyone. Hi Jay, I do have a question regarding configurable interface on how to pass a MapString, ? properties. I couldn’t find any other classes using it. JMX reporter overrides it but doesn’t implement it. So with configurable partitioner how can a user pass in partitioner configuration since its getting instantiated within the producer. Thanks, Harsha On May 4, 2015 at 10:36:45 AM, Jay Kreps (jay.kr...@gmail.com) wrote: Hey Harsha, That proposal sounds good. One minor thing--I don't think we need to have the partitioner.metadata property. Our reason for using string properties is exactly to make config extensible at runtime. So a given partitioner can add whatever properties make sense using the configure() api it defines. -Jay On Sun, May 3, 2015 at 5:57 PM, Harsha ka...@harsha.io wrote: Thanks Jay Gianmarco for the comments. I picked the option A, if user sends a partition id than it will applied and partitioner.class method will only called if partition id is null . Please take a look at the updated KIP here https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer . Let me know if you see anything missing. Thanks, Harsha On Fri, Apr 24, 2015, at 02:15 AM, Gianmarco De Francisci Morales wrote: Hi, Here are the questions I think we should consider: 1. Do we need this at all given that we have the partition argument in ProducerRecord which gives full control? I think we do need it because this is a way to plug in a different partitioning strategy at run time and do it in a fairly transparent way. Yes, we need it if we want to support different partitioning strategies inside Kafka rather than requiring the user to code them externally. 3. Do we need to add the value? I suspect people will have uses for computing something off a few fields in the value to choose the partition. This would be useful in cases where the key was being used for log compaction purposes and did not contain the full information for computing the partition. I am not entirely sure about this. I guess that most partitioners should not use it. I think it makes it easier to reason about the system if the partitioner only works on the key. Hoever, if the value (and its serialization) are already available, there is not much harm in passing them along. 4. This interface doesn't include either an init() or close() method. It should implement Closable and Configurable, right? Right now the only application I can think of to have an init() and close() is to read some state information (e.g., load information) that is published on some external distributed storage (e.g., zookeeper) by the brokers. It might be useful also for reconfiguration and state migration. I think it's not a very common use case right now, but if the added complexity is not too much it might be worth to have support for these methods. 5. What happens if the user both sets the partition id in the ProducerRecord and sets a partitioner? Does the partition id just get passed in to the partitioner (as sort of implied in this interface?). This is a bit weird since if you pass in the partition id you kind of expect it to get used, right? Or is it the case that if you specify a partition the partitioner isn't used at all (in which case no point in including partition in the Partitioner api). The user should be able to override the partitioner on a per-record basis by explicitly setting the partition id. I don't think it makes sense for the partitioners to take hints on the partition. I would even go the extra step, and have a default logic that accepts both key and partition id (current interface) and calls partition() only if the partition id is not set. The partition() method does *not* take the partition ID as input (only key-value). Cheers, -- Gianmarco Cheers, -Jay On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, Here is the KIP for adding a partitioner interface for producer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer There is one open question about how interface should look like. Please take a look and let me know if you prefer one way or the other. Thanks, Harsha
[jira] [Created] (KAFKA-2174) Wrong TopicMetadata deserialization
Alexey Ozeritskiy created KAFKA-2174: Summary: Wrong TopicMetadata deserialization Key: KAFKA-2174 URL: https://issues.apache.org/jira/browse/KAFKA-2174 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Alexey Ozeritskiy TopicMetadata.readFrom assumes that ByteBuffer always contains the full set of partitions but it is not true. On incomplete metadata we will get java.lang.ArrayIndexOutOfBoundsException: {code} java.lang.ArrayIndexOutOfBoundsException: 47 at kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Range.foreach(Range.scala:141) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31) {code} We sometimes get this exceptions on any broker restart (kill -TERM, controlled.shutdown.enable=false). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Yes, that is the plan. On 5/5/15, 8:23 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: Just a quick question, can we handle REQUEST TIMEOUT as disconnections and do a fresh MetaDataRequest and retry instead of failing the request? Thanks, Mayuresh On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed up on this KIP because currently we experience mirror-maker hung very likely when a broker is down. I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata timeout to expire the batches which are sitting in accumulator without leader info. I did that because the situation there is essentially missing metadata. As a summary of what I am thinking about the timeout in new Producer: 1. Metadata timeout: - used in send(), blocking - used in accumulator to expire batches with timeout exception. 2. Linger.ms - Used in accumulator to ready the batch for drain 3. Request timeout - Used in NetworkClient to expire a batch and retry if no response is received for a request before timeout. So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). Would like to see what people think of above approach? Jiangjie (Becket) Qin On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote: Jun, I thought a little bit differently on this. Intuitively, I am thinking that if a partition is offline, the metadata for that partition should be considered not ready because we don’t know which broker we should send the message to. So those sends need to be blocked on metadata timeout. Another thing I’m wondering is in which scenario an offline partition will become online again in a short period of time and how likely it will occur. My understanding is that the batch timeout for batches sitting in accumulator should be larger than linger.ms but should not be too long (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer with batches to be aborted. That said, I do agree it is reasonable to buffer the message for some time so messages to other partitions can still get sent. But adding another expiration in addition to linger.ms - which is essentially a timeout - sounds a little bit confusing. Maybe we can do this, let the batch sit in accumulator up to linger.ms, then fail it if necessary. What do you think? Thanks, Jiangjie (Becket) Qin On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Allowing messages to be accumulated in an offline partition could be useful since the partition may become available before the request timeout or linger time is reached. Now that we are planning to add a new timeout, it would be useful to think through whether/how that applies to messages in the accumulator too. Thanks, Jun On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the
Review Request 33916: Patch for KAFKA-2163
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33916/ --- Review request for kafka. Bugs: KAFKA-2163 https://issues.apache.org/jira/browse/KAFKA-2163 Repository: kafka Description --- fix renames and logging improvements Diffs - core/src/main/scala/kafka/cluster/Partition.scala 122b1dbbe45cb27aed79b5be1e735fb617c716b0 core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a Diff: https://reviews.apache.org/r/33916/diff/ Testing --- Thanks, Joel Koshy
[jira] [Updated] (KAFKA-2163) Offsets manager cache should prevent stale-offset-cleanup while an offset load is in progress; otherwise we can lose consumer offsets
[ https://issues.apache.org/jira/browse/KAFKA-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-2163: -- Attachment: KAFKA-2163.patch Offsets manager cache should prevent stale-offset-cleanup while an offset load is in progress; otherwise we can lose consumer offsets - Key: KAFKA-2163 URL: https://issues.apache.org/jira/browse/KAFKA-2163 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Fix For: 0.8.3 Attachments: KAFKA-2163.patch When leadership of an offsets partition moves, the new leader loads offsets from that partition into the offset manager cache. Independently, the offset manager has a periodic cleanup task for stale offsets that removes old offsets from the cache and appends tombstones for those. If the partition happens to contain much older offsets (earlier in the log) and inserts those into the cache; the cleanup task may run and see those offsets (which it deems to be stale) and proceeds to remove from the cache and append a tombstone to the end of the log. The tombstone will override the true latest offset and a subsequent offset fetch request will return no offset. We just need to prevent the cleanup task from running during an offset load. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2163) Offsets manager cache should prevent stale-offset-cleanup while an offset load is in progress; otherwise we can lose consumer offsets
[ https://issues.apache.org/jira/browse/KAFKA-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-2163: -- Assignee: Joel Koshy Status: Patch Available (was: Open) Offsets manager cache should prevent stale-offset-cleanup while an offset load is in progress; otherwise we can lose consumer offsets - Key: KAFKA-2163 URL: https://issues.apache.org/jira/browse/KAFKA-2163 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Joel Koshy Fix For: 0.8.3 Attachments: KAFKA-2163.patch When leadership of an offsets partition moves, the new leader loads offsets from that partition into the offset manager cache. Independently, the offset manager has a periodic cleanup task for stale offsets that removes old offsets from the cache and appends tombstones for those. If the partition happens to contain much older offsets (earlier in the log) and inserts those into the cache; the cleanup task may run and see those offsets (which it deems to be stale) and proceeds to remove from the cache and append a tombstone to the end of the log. The tombstone will override the true latest offset and a subsequent offset fetch request will return no offset. We just need to prevent the cleanup task from running during an offset load. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33065: Patch for KAFKA-1928
On April 18, 2015, 12:49 a.m., Jun Rao wrote: core/src/main/scala/kafka/api/FetchResponse.scala, lines 66-72 https://reviews.apache.org/r/33065/diff/1/?file=922627#file922627line66 I am wondering if we need both completed() and remaining() in Send. It seems that one of the two is enough for our usage. Also, not sure how useful reify() is. Currently, it's not used anywhere. Gwen Shapira wrote: reification is a very central concept in Kafka's The Trial :) I agree that reify() and remaining() can be removed without any pain. Since it is a public API, do we need to deprecate them first? Yes, we should we able to remove them since the api that's public is in kafka.javaapi.FetchResponse. - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review80557 --- On May 1, 2015, 10:45 p.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/ --- (Updated May 1, 2015, 10:45 p.m.) Review request for kafka. Bugs: KAFKA-1928 https://issues.apache.org/jira/browse/KAFKA-1928 Repository: kafka Description --- first pass on replacing Send implement maxSize and improved docs Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Conflicts: core/src/main/scala/kafka/network/RequestChannel.scala moved selector out of abstract thread mid-way through putting selector in SocketServer Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 renamed requestKey to connectionId to reflect new use and changed type from Any to String Following Jun's comments - moved MultiSend to client. Cleaned up destinations as well Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b16e7ac566f8bdcd39a7240ceb619fd30e clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85847b022efec8cb05c450bb18231db6979 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java e55ab11df4db0b0084f841a74cbcf819caf780d5 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 129ae827bccbd982ad93d56e46c6f5c46f147fe0 clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java c8213e156ec9c9af49ee09f5238492318516aaa3 clients/src/main/java/org/apache/kafka/common/network/MultiSend.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java fc0d168324aaebb97065b0aafbd547a1994d76a7 clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 68327cd3a734fd429966d3e2016a2488dbbb19e5 clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a09e470166a1c33639cf0cab26a3bce98ec clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 27cbf390c7f148ffa8c5abc154c72cbf0829715c clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/MockClient.java 5e3fab13e3c02eb351558ec973b949b3d1196085 clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 8b278892883e63899b53e15efb9d8c926131e858 clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java d5b306b026e788b4e5479f3419805aa49ae889f3 clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06a4c9e5bb351201299cd3037f5226f0e6c core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
[jira] [Commented] (KAFKA-2150) FetcherThread backoff need to grab lock before wait on condition.
[ https://issues.apache.org/jira/browse/KAFKA-2150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14531695#comment-14531695 ] Sriharsha Chintalapani commented on KAFKA-2150: --- [~guozhang] Can you please take a look at this patch . Thanks. FetcherThread backoff need to grab lock before wait on condition. - Key: KAFKA-2150 URL: https://issues.apache.org/jira/browse/KAFKA-2150 Project: Kafka Issue Type: Bug Reporter: Jiangjie Qin Assignee: Sriharsha Chintalapani Labels: newbie++ Attachments: KAFKA-2150.patch, KAFKA-2150_2015-04-25_13:14:05.patch, KAFKA-2150_2015-04-25_13:18:35.patch, KAFKA-2150_2015-04-25_13:35:36.patch Saw the following error: kafka.api.ProducerBounceTest testBrokerFailure STANDARD_OUT [2015-04-25 00:40:43,997] ERROR [ReplicaFetcherThread-0-0], Error due to (kafka.server.ReplicaFetcherThread:103) java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:127) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1239) at java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1668) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2107) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:95) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) [2015-04-25 00:40:47,064] ERROR [ReplicaFetcherThread-0-1], Error due to (kafka.server.ReplicaFetcherThread:103) java.lang.IllegalMonitorStateException at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:127) at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1239) at java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1668) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2107) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:95) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) We should grab the lock before waiting on the condition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] KIP-4 Admin Commands / Phase-1
+1 (binding) ~ Joe Stein - - - - - - - - - - - - - - - - - http://www.stealth.ly - - - - - - - - - - - - - - - - - On Tue, May 5, 2015 at 11:16 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Hi all, This is a voting thread for KIP-4 Phase-1. It will include Wire protocol changes and server side handling code. https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations Thanks, Andrii Biletskyi
Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer
+1 with a minor comment: do we need an init method given it extends Configurable? Also, can you move this wiki out of drafts and add it to the table in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals? Thanks, Joel On Wed, May 06, 2015 at 07:46:46AM -0700, Sriharsha Chintalapani wrote: Thanks Jay. I removed partitioner.metadata from KIP. I’ll send an updated patch. -- Harsha Sent with Airmail On May 5, 2015 at 6:31:47 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) wrote: Thanks for the comments everyone. Hi Jay, I do have a question regarding configurable interface on how to pass a MapString, ? properties. I couldn’t find any other classes using it. JMX reporter overrides it but doesn’t implement it. So with configurable partitioner how can a user pass in partitioner configuration since its getting instantiated within the producer. Thanks, Harsha On May 4, 2015 at 10:36:45 AM, Jay Kreps (jay.kr...@gmail.com) wrote: Hey Harsha, That proposal sounds good. One minor thing--I don't think we need to have the partitioner.metadata property. Our reason for using string properties is exactly to make config extensible at runtime. So a given partitioner can add whatever properties make sense using the configure() api it defines. -Jay On Sun, May 3, 2015 at 5:57 PM, Harsha ka...@harsha.io wrote: Thanks Jay Gianmarco for the comments. I picked the option A, if user sends a partition id than it will applied and partitioner.class method will only called if partition id is null . Please take a look at the updated KIP here https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer . Let me know if you see anything missing. Thanks, Harsha On Fri, Apr 24, 2015, at 02:15 AM, Gianmarco De Francisci Morales wrote: Hi, Here are the questions I think we should consider: 1. Do we need this at all given that we have the partition argument in ProducerRecord which gives full control? I think we do need it because this is a way to plug in a different partitioning strategy at run time and do it in a fairly transparent way. Yes, we need it if we want to support different partitioning strategies inside Kafka rather than requiring the user to code them externally. 3. Do we need to add the value? I suspect people will have uses for computing something off a few fields in the value to choose the partition. This would be useful in cases where the key was being used for log compaction purposes and did not contain the full information for computing the partition. I am not entirely sure about this. I guess that most partitioners should not use it. I think it makes it easier to reason about the system if the partitioner only works on the key. Hoever, if the value (and its serialization) are already available, there is not much harm in passing them along. 4. This interface doesn't include either an init() or close() method. It should implement Closable and Configurable, right? Right now the only application I can think of to have an init() and close() is to read some state information (e.g., load information) that is published on some external distributed storage (e.g., zookeeper) by the brokers. It might be useful also for reconfiguration and state migration. I think it's not a very common use case right now, but if the added complexity is not too much it might be worth to have support for these methods. 5. What happens if the user both sets the partition id in the ProducerRecord and sets a partitioner? Does the partition id just get passed in to the partitioner (as sort of implied in this interface?). This is a bit weird since if you pass in the partition id you kind of expect it to get used, right? Or is it the case that if you specify a partition the partitioner isn't used at all (in which case no point in including partition in the Partitioner api). The user should be able to override the partitioner on a per-record basis by explicitly setting the partition id. I don't think it makes sense for the partitioners to take hints on the partition. I would even go the extra step, and have a default logic that accepts both key and partition id (current interface) and calls partition() only if the partition id is not set. The partition() method does *not* take the partition ID as input (only key-value). Cheers, -- Gianmarco Cheers, -Jay On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, Here is the KIP for adding a partitioner interface for producer.
Re: Need Access to Wiki Page To Create Page for Discussion
What your wiki user id? Thanks, Jun On Wed, May 6, 2015 at 11:09 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi All, I need access to create Discussion or KIP document. Let me know what is process of getting access. Thanks, Bhavesh
[jira] [Commented] (KAFKA-2160) DelayedOperationPurgatory should remove the pair in watchersForKey with empty watchers list
[ https://issues.apache.org/jira/browse/KAFKA-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14531672#comment-14531672 ] Guozhang Wang commented on KAFKA-2160: -- Updated reviewboard https://reviews.apache.org/r/33731/diff/ against branch origin/trunk DelayedOperationPurgatory should remove the pair in watchersForKey with empty watchers list --- Key: KAFKA-2160 URL: https://issues.apache.org/jira/browse/KAFKA-2160 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Attachments: KAFKA-2160.patch, KAFKA-2160_2015-04-30_15:20:14.patch, KAFKA-2160_2015-05-06_16:31:48.patch With purgatory usage in consumer coordinator, it will be common that watcher lists are very short and live only for a short time. So we'd better clean them from the watchersForKey Pool once the list become empty in checkAndComplete() calls. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33731: Second Attempt to Fix KAFKA-2160
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33731/ --- (Updated May 6, 2015, 11:31 p.m.) Review request for kafka. Summary (updated) - Second Attempt to Fix KAFKA-2160 Bugs: KAFKA-2160 https://issues.apache.org/jira/browse/KAFKA-2160 Repository: kafka Description (updated) --- Add a per-key lock in Pool which is used for non-primitive UpdateAndMaybeRemove function Diffs (updated) - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala aa8d9404a3e78a365df06404b79d0d8f694b4bd6 core/src/main/scala/kafka/server/DelayedOperation.scala 2ed9b467c2865e5717d7f6fd933cd09a5c5b22c0 core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/utils/Pool.scala 9ddcde797341ddd961923cafca16472d84417b5a core/src/main/scala/kafka/utils/timer/Timer.scala b8cde820a770a4e894804f1c268b24b529940650 core/src/test/scala/unit/kafka/log/LogConfigTest.scala f3546adee490891e0d8d0214bef00b1dd7f42227 core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala f3ab3f4ff8eb1aa6b2ab87ba75f72eceb6649620 Diff: https://reviews.apache.org/r/33731/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Updated] (KAFKA-2160) DelayedOperationPurgatory should remove the pair in watchersForKey with empty watchers list
[ https://issues.apache.org/jira/browse/KAFKA-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2160: - Attachment: KAFKA-2160_2015-05-06_16:31:48.patch DelayedOperationPurgatory should remove the pair in watchersForKey with empty watchers list --- Key: KAFKA-2160 URL: https://issues.apache.org/jira/browse/KAFKA-2160 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Attachments: KAFKA-2160.patch, KAFKA-2160_2015-04-30_15:20:14.patch, KAFKA-2160_2015-05-06_16:31:48.patch With purgatory usage in consumer coordinator, it will be common that watcher lists are very short and live only for a short time. So we'd better clean them from the watchersForKey Pool once the list become empty in checkAndComplete() calls. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2163) Offsets manager cache should prevent stale-offset-cleanup while an offset load is in progress; otherwise we can lose consumer offsets
[ https://issues.apache.org/jira/browse/KAFKA-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14531529#comment-14531529 ] Joel Koshy commented on KAFKA-2163: --- Created reviewboard https://reviews.apache.org/r/33916/diff/ against branch origin/trunk Offsets manager cache should prevent stale-offset-cleanup while an offset load is in progress; otherwise we can lose consumer offsets - Key: KAFKA-2163 URL: https://issues.apache.org/jira/browse/KAFKA-2163 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Fix For: 0.8.3 Attachments: KAFKA-2163.patch When leadership of an offsets partition moves, the new leader loads offsets from that partition into the offset manager cache. Independently, the offset manager has a periodic cleanup task for stale offsets that removes old offsets from the cache and appends tombstones for those. If the partition happens to contain much older offsets (earlier in the log) and inserts those into the cache; the cleanup task may run and see those offsets (which it deems to be stale) and proceeds to remove from the cache and append a tombstone to the end of the log. The tombstone will override the true latest offset and a subsequent offset fetch request will return no offset. We just need to prevent the cleanup task from running during an offset load. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33065: Patch for KAFKA-1928
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review82625 --- Thanks for the patch. Looks great overall! Some comments below. Also, there seems to be compilation error when running tests. clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://reviews.apache.org/r/33065/#comment133525 Instead of doing the id to string conversion every time, perhaps we can compute the string of the id once in Node and expose it through an idString() method? clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://reviews.apache.org/r/33065/#comment133526 Perhaps changing nodeId to node. clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://reviews.apache.org/r/33065/#comment133527 Perhaps changing id to node? clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java https://reviews.apache.org/r/33065/#comment133529 Since we always use a 4 byte size, perhaps we should make both remaining() and writeTo() return int? clients/src/main/java/org/apache/kafka/common/network/MultiSend.java https://reviews.apache.org/r/33065/#comment133564 Not sure why we need to pass in expectedBytesToWrite. This can be computed from the sends. clients/src/main/java/org/apache/kafka/common/network/MultiSend.java https://reviews.apache.org/r/33065/#comment133532 Instead of moving back the cursor in the iterator, would it be simpler to maintain a current Send that's being iterated on? clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java https://reviews.apache.org/r/33065/#comment133534 request size probably should be renamed to receive size since it can be for either request or response. clients/src/main/java/org/apache/kafka/common/network/Selector.java https://reviews.apache.org/r/33065/#comment133512 There is a followup patch in KAFKA-1282 that we haven't incorporated. Bascially, the linked hash map needs to be in access order. Since we are moving code around, could you incorporate the change here? clients/src/main/java/org/apache/kafka/common/network/Selector.java https://reviews.apache.org/r/33065/#comment133537 Instead of -1, perhaps we can reuse NetworkReceive.UNLIMITED? clients/src/main/java/org/apache/kafka/common/network/Selector.java https://reviews.apache.org/r/33065/#comment133541 Could we just iterate the keySet directly instead of making a copy first? clients/src/main/java/org/apache/kafka/common/network/Selector.java https://reviews.apache.org/r/33065/#comment133543 It's reasonable to have a size for Send. The server size only records sent bytes, instead of messages. clients/src/main/java/org/apache/kafka/common/network/Selector.java https://reviews.apache.org/r/33065/#comment133544 In this case, we should probably just propagate the exception and potentially kill the caller, instead of continue, right? clients/src/main/java/org/apache/kafka/common/network/Selector.java https://reviews.apache.org/r/33065/#comment133548 Perhaps we could pass in connectionMaxIdleNanos to the constructor of Selector. Then this method can be private. For clients, we can default to max long so that they could close idle connections since the broker is doing that. The broker can pass in the connectionMaxIdleNanos from config. core/src/main/scala/kafka/network/RequestChannel.scala https://reviews.apache.org/r/33065/#comment133506 We probably don't need remoteAddress any more since that's part of connectionId now. core/src/main/scala/kafka/network/SocketServer.scala https://reviews.apache.org/r/33065/#comment133395 Could we define both methods as override? core/src/main/scala/kafka/network/SocketServer.scala https://reviews.apache.org/r/33065/#comment133567 Currently, Selector exposes metrics per connection. While this works fine for producer/consumer clients since the number of brokers is typically small, I am concerned about having those on the server since there could be 10s of thousands of connections in a broker. Perhaps we can pass in a config to disable per node metric in Selector when used in the broker. We probably should think through if there is any metrics in Selector that we want to expose in Coda Hale metrics. core/src/main/scala/kafka/network/SocketServer.scala https://reviews.apache.org/r/33065/#comment133500 It seems that we don't need startSelectTime. We will need to expose the io-wait-ratio metric in selector to networkProcessor.IdlePercent. This can be done in a followup jira though if the patch is too big. core/src/main/scala/kafka/network/SocketServer.scala https://reviews.apache.org/r/33065/#comment133495 No need for return value. core/src/main/scala/kafka/network/SocketServer.scala
[jira] [Updated] (KAFKA-2175) Reduce server log verbosity at info level
[ https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino updated KAFKA-2175: --- Status: Patch Available (was: Open) Reduce server log verbosity at info level - Key: KAFKA-2175 URL: https://issues.apache.org/jira/browse/KAFKA-2175 Project: Kafka Issue Type: Improvement Components: controller, zkclient Affects Versions: 0.8.3 Reporter: Todd Palino Assignee: Todd Palino Priority: Minor Labels: newbie Attachments: KAFKA-2175.patch Currently, the broker logs two messages at INFO level that should be at a lower level. This serves only to fill up log files on disk, and can cause performance issues due to synchronous logging as well. The first is the Closing socket connection message when there is no error. This should be reduced to debug level. The second is the message that ZkUtil writes when updating the partition reassignment JSON. This message contains the entire JSON blob and should never be written at info level. In addition, there is already a message in the controller log stating that the ZK node has been updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2175) Reduce server log verbosity at info level
[ https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Todd Palino reassigned KAFKA-2175: -- Assignee: Todd Palino (was: Neha Narkhede) Reduce server log verbosity at info level - Key: KAFKA-2175 URL: https://issues.apache.org/jira/browse/KAFKA-2175 Project: Kafka Issue Type: Improvement Components: controller, zkclient Affects Versions: 0.8.3 Reporter: Todd Palino Assignee: Todd Palino Priority: Minor Labels: newbie Attachments: KAFKA-2175.patch Currently, the broker logs two messages at INFO level that should be at a lower level. This serves only to fill up log files on disk, and can cause performance issues due to synchronous logging as well. The first is the Closing socket connection message when there is no error. This should be reduced to debug level. The second is the message that ZkUtil writes when updating the partition reassignment JSON. This message contains the entire JSON blob and should never be written at info level. In addition, there is already a message in the controller log stating that the ZK node has been updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2175) Reduce server log verbosity at info level
Todd Palino created KAFKA-2175: -- Summary: Reduce server log verbosity at info level Key: KAFKA-2175 URL: https://issues.apache.org/jira/browse/KAFKA-2175 Project: Kafka Issue Type: Improvement Components: controller, zkclient Affects Versions: 0.8.3 Reporter: Todd Palino Assignee: Neha Narkhede Priority: Minor Currently, the broker logs two messages at INFO level that should be at a lower level. This serves only to fill up log files on disk, and can cause performance issues due to synchronous logging as well. The first is the Closing socket connection message when there is no error. This should be reduced to debug level. The second is the message that ZkUtil writes when updating the partition reassignment JSON. This message contains the entire JSON blob and should never be written at info level. In addition, there is already a message in the controller log stating that the ZK node has been updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.
Jiangjie, Just trying to figure the class reference hierarchy of this approach (A - B means A either has a member variable of B or take B as API parameter). Metadata will have interface that takes in KafkaClient as a parameter, so Metadata - KafkaClient 1. For producer: KafkaProducer - Sender, KafkaProducer - Metadata, Sender - KafkaClient, Sender - Metadata 2. For consumer: KafkaConsumer - Coordinator, KafkaConsumer - Fetcher, KafkaConsumer - KafkaClient, KafkaConsumer - Metadata, Coordinator - KafkaClient, Coordinator - Metadata, Fetcher - KafkaClient, Fetcher - Metadata, 3. For controller: KafkaController - KafkaClient 4. For replica fetcher: ReplicaFetcher - KafkaClient For producer / consumer, the interleaving seems a bit complicated to me. Instead, we could completely remove the concept of Metadata from KafkaClient, such that NetworkClient.handleCompletedReceives does not specifically handle metadata responses, but just call response.request().callback().onComplete(response) as well, which will try to update the metadata and check for any errors. Guozhang On Wed, May 6, 2015 at 10:40 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Jun Ewen, Thanks a lot for the comments. Both of them look better than my original plan. Jun, one downside about not changing NetworkClient but use a different metadata implementation is that NetworkClient will still have all the metadata related code there which makes it a little bit weird. I think Ewen¹s approach solve this problem. Ewen, If I understand correctly, you are proposing something similar to the structure we are using in AbstractFetcherThread/ConsumerFetcherThread/ReplicaFetcherThread. That will make NetworkClient look clean but increase the layers which is probably OK. Inspired by your suggestions. I have another thought which seems closer to Jun¹s idea. What if we move maybeUpdateMetadata()/handleMetadataResponse() and related logic in NetworkClient to metadata and pass in NetworkClient as an argument. Like Jun suggested, we need a Metadata interface and different implementations. Thanks. Jiangjie (Becket) Qin On 5/5/15, 11:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote: +1 on trying to reuse the NetworkClient code. I think Jun's approach could work, but I'm wondering if refactoring a bit could get better separation of concerns without a somewhat awkward nop implementation of Metadata. I'm not sure what combination of delegation or subclassing makes sense yet, but here's another approach that I think could work: * Get rid of metadata stuff from NetworkClient. Add a subclass that also manages all the metadata. (Since it's used for both producer and consumer, the obvious name that I first jumped to is ClientNetworkClient, but somehow I think we can come up with something less confusing.) * leastLoadedNode is the only caller of metadata.fetch() in that class, maybeUpdateMetadata is the only caller of leastLoadedNode, maybeUpdateMetadata is only called in poll when a combination of metadata related timeouts end up being 0. These can be safely refactored into the subclass with one override of poll(). Same with metadataFetchInProgress assuming the rest of the changes below. * Some of the default implementations (e.g. handleMetadataResponse) can be left nops in NetworkClient and moved to the subclass. * Others can be overridden to call the super method then take the additional action necessary (e.g., on disconnect, move the metadata update request to the subclass). * Making the timeout handling in poll() work for both NetworkClient and the new base class might be the messiest part and might require breaking down the implementation of poll into multiple methods. * isReady uses metadataFetchInProgress and gets a timeout from the Metadata class. We can just override this method as well, though I feel like there's probably a cleaner solution. -Ewen On Tue, May 5, 2015 at 4:54 PM, Jun Rao j...@confluent.io wrote: Hi, Jiangjie, Thanks for taking on this. I was thinking that one way to decouple the dependency on Metadata in NetworkClient is the following. 1. Make Metadata an interface. 2. Rename current Metadata class to sth like KafkaMetadata that implements the Metadata interface. 3. Have a new NoOpMetadata class that implements the Metadata interface. This class 3.1 does nothing for any write method 3.2 returns max long for any method that asks for a timestamp 3.3. returns an empty Cluster for fetch(). Then we can leave NetworkClient unchanged and just pass in a NoOpMetadata when using NetworkClient in the controller. The consumer/producer client will be using KafkaMetadata. As for replica fetchers, it may be possible to use KafkaConsumer. However, we don't need the metadata and the offset management. So, perhaps it's easier to just use NetworkClient. Also, currently, there is one replica fetcher thread per source broker.
Re: [DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.
On Wed, May 6, 2015 at 10:40 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Jun Ewen, Thanks a lot for the comments. Both of them look better than my original plan. Jun, one downside about not changing NetworkClient but use a different metadata implementation is that NetworkClient will still have all the metadata related code there which makes it a little bit weird. I think Ewen¹s approach solve this problem. Ewen, If I understand correctly, you are proposing something similar to the structure we are using in AbstractFetcherThread/ConsumerFetcherThread/ReplicaFetcherThread. That will make NetworkClient look clean but increase the layers which is probably OK. Yes, same basic idea. Inspired by your suggestions. I have another thought which seems closer to Jun¹s idea. What if we move maybeUpdateMetadata()/handleMetadataResponse() and related logic in NetworkClient to metadata and pass in NetworkClient as an argument. Like Jun suggested, we need a Metadata interface and different implementations. This could work too. I think it ends up with the new Metadata interface and NetworkClient being more coupled than layered. What I was hoping to do was to remove the concept of Metadata entirely from NetworkClient so it only deals with Nodes and is focused on the request/response handling. But that might require introducing a listener interface (if Metadata was refactored to be passed a NetworkClient it would use to initiate metadata refreshes) or provide the right hooks so that behavior can be overridden (if Metadata was to subclass NetworkClient). I think all these options could work and it's not clear to me which will work out best in separating the code, so I'd just try one of them and see how the patch looks. -Ewen Thanks. Jiangjie (Becket) Qin On 5/5/15, 11:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote: +1 on trying to reuse the NetworkClient code. I think Jun's approach could work, but I'm wondering if refactoring a bit could get better separation of concerns without a somewhat awkward nop implementation of Metadata. I'm not sure what combination of delegation or subclassing makes sense yet, but here's another approach that I think could work: * Get rid of metadata stuff from NetworkClient. Add a subclass that also manages all the metadata. (Since it's used for both producer and consumer, the obvious name that I first jumped to is ClientNetworkClient, but somehow I think we can come up with something less confusing.) * leastLoadedNode is the only caller of metadata.fetch() in that class, maybeUpdateMetadata is the only caller of leastLoadedNode, maybeUpdateMetadata is only called in poll when a combination of metadata related timeouts end up being 0. These can be safely refactored into the subclass with one override of poll(). Same with metadataFetchInProgress assuming the rest of the changes below. * Some of the default implementations (e.g. handleMetadataResponse) can be left nops in NetworkClient and moved to the subclass. * Others can be overridden to call the super method then take the additional action necessary (e.g., on disconnect, move the metadata update request to the subclass). * Making the timeout handling in poll() work for both NetworkClient and the new base class might be the messiest part and might require breaking down the implementation of poll into multiple methods. * isReady uses metadataFetchInProgress and gets a timeout from the Metadata class. We can just override this method as well, though I feel like there's probably a cleaner solution. -Ewen On Tue, May 5, 2015 at 4:54 PM, Jun Rao j...@confluent.io wrote: Hi, Jiangjie, Thanks for taking on this. I was thinking that one way to decouple the dependency on Metadata in NetworkClient is the following. 1. Make Metadata an interface. 2. Rename current Metadata class to sth like KafkaMetadata that implements the Metadata interface. 3. Have a new NoOpMetadata class that implements the Metadata interface. This class 3.1 does nothing for any write method 3.2 returns max long for any method that asks for a timestamp 3.3. returns an empty Cluster for fetch(). Then we can leave NetworkClient unchanged and just pass in a NoOpMetadata when using NetworkClient in the controller. The consumer/producer client will be using KafkaMetadata. As for replica fetchers, it may be possible to use KafkaConsumer. However, we don't need the metadata and the offset management. So, perhaps it's easier to just use NetworkClient. Also, currently, there is one replica fetcher thread per source broker. By using NetworkClient, we can change that to using a single thread for all source brokers. This is probably a bigger change. So, maybe we can do it later. Jun I think we probably need to replace replica fetcher with NetworkClient as well. Replica fetcher gets leader from the controller and therefore doesn't
Need Access to Wiki Page To Create Page for Discussion
Hi All, I need access to create Discussion or KIP document. Let me know what is process of getting access. Thanks, Bhavesh
[jira] [Resolved] (KAFKA-2027) kafka never notifies the zookeeper client when a partition moved with due to an auto-rebalance (when auto.leader.rebalance.enable=true)
[ https://issues.apache.org/jira/browse/KAFKA-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sampath Reddy Lambu resolved KAFKA-2027. Resolution: Invalid kafka never notifies the zookeeper client when a partition moved with due to an auto-rebalance (when auto.leader.rebalance.enable=true) --- Key: KAFKA-2027 URL: https://issues.apache.org/jira/browse/KAFKA-2027 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.1.1 Environment: Kafka 0.8.1.1, Node.js Mac OS Reporter: Sampath Reddy Lambu Assignee: Neha Narkhede Priority: Blocker I would like report an issue when auto.leader.rebalance.enable=true. Kafka never sends an event/notification to its zookeeper client after preferred election complete. This works fine with manual rebalance from CLI (kafka-preferred-replica-election.sh). Initially i thought this issue was with Kafka-Node, but its not. An event should be emitted from zookeeper if any partition moved while preferred election. Im working with kafka_2.9.2-0.8.1.1 (Broker's) Kafka-Node(Node.JS). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (KAFKA-2027) kafka never notifies the zookeeper client when a partition moved with due to an auto-rebalance (when auto.leader.rebalance.enable=true)
[ https://issues.apache.org/jira/browse/KAFKA-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sampath Reddy Lambu closed KAFKA-2027. -- kafka never notifies the zookeeper client when a partition moved with due to an auto-rebalance (when auto.leader.rebalance.enable=true) --- Key: KAFKA-2027 URL: https://issues.apache.org/jira/browse/KAFKA-2027 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.1.1 Environment: Kafka 0.8.1.1, Node.js Mac OS Reporter: Sampath Reddy Lambu Assignee: Neha Narkhede Priority: Blocker I would like report an issue when auto.leader.rebalance.enable=true. Kafka never sends an event/notification to its zookeeper client after preferred election complete. This works fine with manual rebalance from CLI (kafka-preferred-replica-election.sh). Initially i thought this issue was with Kafka-Node, but its not. An event should be emitted from zookeeper if any partition moved while preferred election. Im working with kafka_2.9.2-0.8.1.1 (Broker's) Kafka-Node(Node.JS). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2027) kafka never notifies the zookeeper client when a partition moved with due to an auto-rebalance (when auto.leader.rebalance.enable=true)
[ https://issues.apache.org/jira/browse/KAFKA-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14531073#comment-14531073 ] Sampath Reddy Lambu commented on KAFKA-2027: this was handled in Kafka-Node project in 0.2.25 version. closing this issue. Thank you for your response. -Sampath kafka never notifies the zookeeper client when a partition moved with due to an auto-rebalance (when auto.leader.rebalance.enable=true) --- Key: KAFKA-2027 URL: https://issues.apache.org/jira/browse/KAFKA-2027 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.1.1 Environment: Kafka 0.8.1.1, Node.js Mac OS Reporter: Sampath Reddy Lambu Assignee: Neha Narkhede Priority: Blocker I would like report an issue when auto.leader.rebalance.enable=true. Kafka never sends an event/notification to its zookeeper client after preferred election complete. This works fine with manual rebalance from CLI (kafka-preferred-replica-election.sh). Initially i thought this issue was with Kafka-Node, but its not. An event should be emitted from zookeeper if any partition moved while preferred election. Im working with kafka_2.9.2-0.8.1.1 (Broker's) Kafka-Node(Node.JS). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.
Jun Ewen, Thanks a lot for the comments. Both of them look better than my original plan. Jun, one downside about not changing NetworkClient but use a different metadata implementation is that NetworkClient will still have all the metadata related code there which makes it a little bit weird. I think Ewen¹s approach solve this problem. Ewen, If I understand correctly, you are proposing something similar to the structure we are using in AbstractFetcherThread/ConsumerFetcherThread/ReplicaFetcherThread. That will make NetworkClient look clean but increase the layers which is probably OK. Inspired by your suggestions. I have another thought which seems closer to Jun¹s idea. What if we move maybeUpdateMetadata()/handleMetadataResponse() and related logic in NetworkClient to metadata and pass in NetworkClient as an argument. Like Jun suggested, we need a Metadata interface and different implementations. Thanks. Jiangjie (Becket) Qin On 5/5/15, 11:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote: +1 on trying to reuse the NetworkClient code. I think Jun's approach could work, but I'm wondering if refactoring a bit could get better separation of concerns without a somewhat awkward nop implementation of Metadata. I'm not sure what combination of delegation or subclassing makes sense yet, but here's another approach that I think could work: * Get rid of metadata stuff from NetworkClient. Add a subclass that also manages all the metadata. (Since it's used for both producer and consumer, the obvious name that I first jumped to is ClientNetworkClient, but somehow I think we can come up with something less confusing.) * leastLoadedNode is the only caller of metadata.fetch() in that class, maybeUpdateMetadata is the only caller of leastLoadedNode, maybeUpdateMetadata is only called in poll when a combination of metadata related timeouts end up being 0. These can be safely refactored into the subclass with one override of poll(). Same with metadataFetchInProgress assuming the rest of the changes below. * Some of the default implementations (e.g. handleMetadataResponse) can be left nops in NetworkClient and moved to the subclass. * Others can be overridden to call the super method then take the additional action necessary (e.g., on disconnect, move the metadata update request to the subclass). * Making the timeout handling in poll() work for both NetworkClient and the new base class might be the messiest part and might require breaking down the implementation of poll into multiple methods. * isReady uses metadataFetchInProgress and gets a timeout from the Metadata class. We can just override this method as well, though I feel like there's probably a cleaner solution. -Ewen On Tue, May 5, 2015 at 4:54 PM, Jun Rao j...@confluent.io wrote: Hi, Jiangjie, Thanks for taking on this. I was thinking that one way to decouple the dependency on Metadata in NetworkClient is the following. 1. Make Metadata an interface. 2. Rename current Metadata class to sth like KafkaMetadata that implements the Metadata interface. 3. Have a new NoOpMetadata class that implements the Metadata interface. This class 3.1 does nothing for any write method 3.2 returns max long for any method that asks for a timestamp 3.3. returns an empty Cluster for fetch(). Then we can leave NetworkClient unchanged and just pass in a NoOpMetadata when using NetworkClient in the controller. The consumer/producer client will be using KafkaMetadata. As for replica fetchers, it may be possible to use KafkaConsumer. However, we don't need the metadata and the offset management. So, perhaps it's easier to just use NetworkClient. Also, currently, there is one replica fetcher thread per source broker. By using NetworkClient, we can change that to using a single thread for all source brokers. This is probably a bigger change. So, maybe we can do it later. Jun I think we probably need to replace replica fetcher with NetworkClient as well. Replica fetcher gets leader from the controller and therefore doesn't On Tue, May 5, 2015 at 1:37 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: I am trying to see if we can reuse the NetworkClient class to be used in controller to broker communication. (Also, we can probably use KafkaConsumer which is already using NetworkClient in replica fetchers). Currently NetworkClient does the following things in addition to sending requests. 1. Connection state management. 2. Flow control (inflight requests) 3. Metadata refresh In controller we need (1) and (2) but not (3). NetworkClient is tightly coupled with metadata now and this is the major blocker of reusing NetworkClient in controller. For controller, we don¹t need NetworkClient to manage any metadata because the controller has listeners to monitor the cluster state and has all the information about topic metadata. I am thinking we can add a disable metadata refresh flag to NetworkClient or set
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Jiangjie, Yes, I think using metadata timeout to expire batches in the record accumulator makes sense. Thanks, Jun On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed up on this KIP because currently we experience mirror-maker hung very likely when a broker is down. I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata timeout to expire the batches which are sitting in accumulator without leader info. I did that because the situation there is essentially missing metadata. As a summary of what I am thinking about the timeout in new Producer: 1. Metadata timeout: - used in send(), blocking - used in accumulator to expire batches with timeout exception. 2. Linger.ms - Used in accumulator to ready the batch for drain 3. Request timeout - Used in NetworkClient to expire a batch and retry if no response is received for a request before timeout. So in this KIP, we only address (3). The only public interface change is a new configuration of request timeout (and maybe change the configuration name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). Would like to see what people think of above approach? Jiangjie (Becket) Qin On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote: Jun, I thought a little bit differently on this. Intuitively, I am thinking that if a partition is offline, the metadata for that partition should be considered not ready because we don’t know which broker we should send the message to. So those sends need to be blocked on metadata timeout. Another thing I’m wondering is in which scenario an offline partition will become online again in a short period of time and how likely it will occur. My understanding is that the batch timeout for batches sitting in accumulator should be larger than linger.ms but should not be too long (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer with batches to be aborted. That said, I do agree it is reasonable to buffer the message for some time so messages to other partitions can still get sent. But adding another expiration in addition to linger.ms - which is essentially a timeout - sounds a little bit confusing. Maybe we can do this, let the batch sit in accumulator up to linger.ms, then fail it if necessary. What do you think? Thanks, Jiangjie (Becket) Qin On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote: Jiangjie, Allowing messages to be accumulated in an offline partition could be useful since the partition may become available before the request timeout or linger time is reached. Now that we are planning to add a new timeout, it would be useful to think through whether/how that applies to messages in the accumulator too. Thanks, Jun On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, Took a quick look at the patch. I think it is still a little bit different. KAFKA-1788 only handles the case where a batch sitting in accumulator for too long. The KIP is trying to solve the issue where a batch has already been drained from accumulator and sent to broker. We might be able to apply timeout on batch level to merge those two cases as Ewen suggested. But I’m not sure if it is a good idea to allow messages whose target partition is offline to sit in accumulator in the first place. Jiangjie (Becket) Qin On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Guozhang and Jiangjie, Isn’t this work being covered in https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the review the patch there. Thanks, Harsha On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks for the update Jiangjie, I think it is actually NOT expected that hardware disconnection will be detected by the selector, but rather will only be revealed upon TCP timeout, which could be hours. A couple of comments on the wiki: 1. For KafkaProducer.close() and KafkaProducer.flush() we need the request timeout as implict timeout. I am not very clear what does this mean? 2. Currently the producer already has a TIMEOUT_CONFIG which should really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming: admittedly it will change the config names but will reduce confusions moving forward. Guozhang On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Checked the code again. It seems that the disconnected channel is not detected by selector as expected. Currently we are depending on the o.a.k.common.network.Selector.disconnected set to see if we need to do something for a disconnected channel. However Selector.disconnected set is only
Re: [DISCUSS] KIP-21 Configuration Management
Hey Jun, Where does the broker get the info, which zk it needs to talk to? On Wednesday, May 6, 2015, Jun Rao j...@confluent.io wrote: Ashish, 3. Just want to clarify. Why can't you store ZK connection config in ZK? This is a property for ZK clients, not ZK server. Thanks, Jun On Wed, May 6, 2015 at 5:48 PM, Ashish Singh asi...@cloudera.com javascript:; wrote: I too would like to share some concerns that we came up with while discussing the effect of moving configs to zookeeper will have. 1. Kafka will start to become a configuration management tool to some degree, and be subject to all the things such tools are commonly asked to do. Kafka'll likely need to re-implement the role / group / service hierarchy that CM uses. Kafka'll need some way to conveniently dump its configs so they can be re-imported later, as a backup tool. People will want this to be audited, which means you'd need distinct logins for different people, and user management. You can try to push some of this stuff onto tools like CM, but this is Kafka going out of its way to be difficult to manage, and most projects don't want to do that. Being unique in how configuration is done is strictly a bad thing for both integration and usability. Probably lots of other stuff. Seems like a bad direction. 2. Where would the default config live? If we decide on keeping the config files around just for getting the default config, then I think on restart, the config file will be ignored. This creates an obnoxious asymmetry for how to configure Kafka the first time and how you update it. You have to learn 2 ways of making config changes. If there was a mistake in your original config file, you can't just edit the config file and restart, you have to go through the API. Reading configs is also more irritating. This all creates a learning curve for users of Kafka that will make it harder to use than other projects. This is also a backwards-incompatible change. 3. All Kafka configs living in ZK is strictly impossible, since at the very least ZK connection configs cannot be stored in ZK. So you will have a file where some values are in effect but others are not, which is again confusing. Also, since you are still reading the config file on first start, there are still multiple sources of truth, or at least the appearance of such to the user. On Wed, May 6, 2015 at 5:33 PM, Jun Rao j...@confluent.io javascript:; wrote: One of the Chef users confirmed that Chef integration could still work if all configs are moved to ZK. My rough understanding of how Chef works is that a user first registers a service host with a Chef server. After that, a Chef client will be run on the service host. The user can then push config changes intended for a service/host to the Chef server. The server is then responsible for pushing the changes to Chef clients. Chef clients support pluggable logic. For example, it can generate a config file that Kafka broker will take. If we move all configs to ZK, we can customize the Chef client to use our config CLI to make the config changes in Kafka. In this model, one probably doesn't need to register every broker in Chef for the config push. Not sure if Puppet works in a similar way. Also for storing the configs, we probably can't store the broker/global level configs in Kafka itself (e.g. in a special topic). The reason is that in order to start a broker, we likely need to make some broker level config changes (e.g., the default log.dir may not be present, the default port may not be available, etc). If we need a broker to be up to make those changes, we get into this chicken and egg problem. Thanks, Jun On Tue, May 5, 2015 at 4:14 PM, Gwen Shapira gshap...@cloudera.com javascript:; wrote: Sorry I missed the call today :) I think an additional requirement would be: Make sure that traditional deployment tools (Puppet, Chef, etc) are still capable of managing Kafka configuration. For this reason, I'd like the configuration refresh to be pretty close to what most Linux services are doing to force a reload of configuration. AFAIK, this involves handling HUP signal in the main thread to reload configuration. Then packaging scripts can add something nice like service kafka reload. (See Apache web server: https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101 ) Gwen On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com javascript:; wrote: Good discussion. Since we will be talking about this at 11am, I wanted to organize these comments into requirements to see if we are all on the same page. REQUIREMENT 1: Needs to accept dynamic config changes. This needs to be general enough to work for all configs that we
Re: Need Access to Wiki Page To Create Page for Discussion
Hi Jun, The account id is Bmis13. Thanks, Bhavesh On Wed, May 6, 2015 at 4:52 PM, Jun Rao j...@confluent.io wrote: What your wiki user id? Thanks, Jun On Wed, May 6, 2015 at 11:09 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi All, I need access to create Discussion or KIP document. Let me know what is process of getting access. Thanks, Bhavesh
Re: Review Request 32650: Patch for KAFKA-2000
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32650/#review82775 --- Thanks for the updated patch. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/32650/#comment133593 Sorry I didn't notice this earlier. This message is now slightly incorrect. Can we get a break-up of the number of offsets deleted due to expiration and due to topic deletion? BTW I'm touching this in KAFKA-2163 as well (which you may want to check out). core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala https://reviews.apache.org/r/32650/#comment133586 Pre-existing issue, but could you rename this to OffsetManagementTest? core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala https://reviews.apache.org/r/32650/#comment133587 testOffsetsDeletedAfterTopicDeletion core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala https://reviews.apache.org/r/32650/#comment133588 Can you use the more recent commit version (which has an explicit retention time?) core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala https://reviews.apache.org/r/32650/#comment133590 Can you also commit offsets for some other topic and verify that those offsets are _not_ deleted? As mentioned in the earlier RB, for the first scenario, we depend on the condition that the UpdateMetadataRequest is sent first. It would be good to have a unit test that explicitly tests this so we never unknowingly break that assumption. I don't have a good way to test this though :( If you have any ideas that would be great. Part of the issue is we have little to no test coverage on the controller. - Joel Koshy On May 3, 2015, 5:39 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32650/ --- (Updated May 3, 2015, 5:39 p.m.) Review request for kafka. Bugs: KAFKA-2000 https://issues.apache.org/jira/browse/KAFKA-2000 Repository: kafka Description --- KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted. Diffs - core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 652208a70f66045b854549d93cbbc2b77c24b10b Diff: https://reviews.apache.org/r/32650/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: [DISCUSS] KIP-21 Configuration Management
Ashish, 3. Just want to clarify. Why can't you store ZK connection config in ZK? This is a property for ZK clients, not ZK server. Thanks, Jun On Wed, May 6, 2015 at 5:48 PM, Ashish Singh asi...@cloudera.com wrote: I too would like to share some concerns that we came up with while discussing the effect of moving configs to zookeeper will have. 1. Kafka will start to become a configuration management tool to some degree, and be subject to all the things such tools are commonly asked to do. Kafka'll likely need to re-implement the role / group / service hierarchy that CM uses. Kafka'll need some way to conveniently dump its configs so they can be re-imported later, as a backup tool. People will want this to be audited, which means you'd need distinct logins for different people, and user management. You can try to push some of this stuff onto tools like CM, but this is Kafka going out of its way to be difficult to manage, and most projects don't want to do that. Being unique in how configuration is done is strictly a bad thing for both integration and usability. Probably lots of other stuff. Seems like a bad direction. 2. Where would the default config live? If we decide on keeping the config files around just for getting the default config, then I think on restart, the config file will be ignored. This creates an obnoxious asymmetry for how to configure Kafka the first time and how you update it. You have to learn 2 ways of making config changes. If there was a mistake in your original config file, you can't just edit the config file and restart, you have to go through the API. Reading configs is also more irritating. This all creates a learning curve for users of Kafka that will make it harder to use than other projects. This is also a backwards-incompatible change. 3. All Kafka configs living in ZK is strictly impossible, since at the very least ZK connection configs cannot be stored in ZK. So you will have a file where some values are in effect but others are not, which is again confusing. Also, since you are still reading the config file on first start, there are still multiple sources of truth, or at least the appearance of such to the user. On Wed, May 6, 2015 at 5:33 PM, Jun Rao j...@confluent.io wrote: One of the Chef users confirmed that Chef integration could still work if all configs are moved to ZK. My rough understanding of how Chef works is that a user first registers a service host with a Chef server. After that, a Chef client will be run on the service host. The user can then push config changes intended for a service/host to the Chef server. The server is then responsible for pushing the changes to Chef clients. Chef clients support pluggable logic. For example, it can generate a config file that Kafka broker will take. If we move all configs to ZK, we can customize the Chef client to use our config CLI to make the config changes in Kafka. In this model, one probably doesn't need to register every broker in Chef for the config push. Not sure if Puppet works in a similar way. Also for storing the configs, we probably can't store the broker/global level configs in Kafka itself (e.g. in a special topic). The reason is that in order to start a broker, we likely need to make some broker level config changes (e.g., the default log.dir may not be present, the default port may not be available, etc). If we need a broker to be up to make those changes, we get into this chicken and egg problem. Thanks, Jun On Tue, May 5, 2015 at 4:14 PM, Gwen Shapira gshap...@cloudera.com wrote: Sorry I missed the call today :) I think an additional requirement would be: Make sure that traditional deployment tools (Puppet, Chef, etc) are still capable of managing Kafka configuration. For this reason, I'd like the configuration refresh to be pretty close to what most Linux services are doing to force a reload of configuration. AFAIK, this involves handling HUP signal in the main thread to reload configuration. Then packaging scripts can add something nice like service kafka reload. (See Apache web server: https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101) Gwen On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com wrote: Good discussion. Since we will be talking about this at 11am, I wanted to organize these comments into requirements to see if we are all on the same page. REQUIREMENT 1: Needs to accept dynamic config changes. This needs to be general enough to work for all configs that we envision may need to accept changes at runtime. e.g., log (topic), broker, client (quotas), etc.. possible options include: - ZooKeeper watcher - Kafka topic - Direct RPC to controller (or config coordinator) The current KIP is really focused on REQUIREMENT 1 and I think that is
Re: [DISCUSS] KIP-21 Configuration Management
I too would like to share some concerns that we came up with while discussing the effect of moving configs to zookeeper will have. 1. Kafka will start to become a configuration management tool to some degree, and be subject to all the things such tools are commonly asked to do. Kafka'll likely need to re-implement the role / group / service hierarchy that CM uses. Kafka'll need some way to conveniently dump its configs so they can be re-imported later, as a backup tool. People will want this to be audited, which means you'd need distinct logins for different people, and user management. You can try to push some of this stuff onto tools like CM, but this is Kafka going out of its way to be difficult to manage, and most projects don't want to do that. Being unique in how configuration is done is strictly a bad thing for both integration and usability. Probably lots of other stuff. Seems like a bad direction. 2. Where would the default config live? If we decide on keeping the config files around just for getting the default config, then I think on restart, the config file will be ignored. This creates an obnoxious asymmetry for how to configure Kafka the first time and how you update it. You have to learn 2 ways of making config changes. If there was a mistake in your original config file, you can't just edit the config file and restart, you have to go through the API. Reading configs is also more irritating. This all creates a learning curve for users of Kafka that will make it harder to use than other projects. This is also a backwards-incompatible change. 3. All Kafka configs living in ZK is strictly impossible, since at the very least ZK connection configs cannot be stored in ZK. So you will have a file where some values are in effect but others are not, which is again confusing. Also, since you are still reading the config file on first start, there are still multiple sources of truth, or at least the appearance of such to the user. On Wed, May 6, 2015 at 5:33 PM, Jun Rao j...@confluent.io wrote: One of the Chef users confirmed that Chef integration could still work if all configs are moved to ZK. My rough understanding of how Chef works is that a user first registers a service host with a Chef server. After that, a Chef client will be run on the service host. The user can then push config changes intended for a service/host to the Chef server. The server is then responsible for pushing the changes to Chef clients. Chef clients support pluggable logic. For example, it can generate a config file that Kafka broker will take. If we move all configs to ZK, we can customize the Chef client to use our config CLI to make the config changes in Kafka. In this model, one probably doesn't need to register every broker in Chef for the config push. Not sure if Puppet works in a similar way. Also for storing the configs, we probably can't store the broker/global level configs in Kafka itself (e.g. in a special topic). The reason is that in order to start a broker, we likely need to make some broker level config changes (e.g., the default log.dir may not be present, the default port may not be available, etc). If we need a broker to be up to make those changes, we get into this chicken and egg problem. Thanks, Jun On Tue, May 5, 2015 at 4:14 PM, Gwen Shapira gshap...@cloudera.com wrote: Sorry I missed the call today :) I think an additional requirement would be: Make sure that traditional deployment tools (Puppet, Chef, etc) are still capable of managing Kafka configuration. For this reason, I'd like the configuration refresh to be pretty close to what most Linux services are doing to force a reload of configuration. AFAIK, this involves handling HUP signal in the main thread to reload configuration. Then packaging scripts can add something nice like service kafka reload. (See Apache web server: https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101) Gwen On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com wrote: Good discussion. Since we will be talking about this at 11am, I wanted to organize these comments into requirements to see if we are all on the same page. REQUIREMENT 1: Needs to accept dynamic config changes. This needs to be general enough to work for all configs that we envision may need to accept changes at runtime. e.g., log (topic), broker, client (quotas), etc.. possible options include: - ZooKeeper watcher - Kafka topic - Direct RPC to controller (or config coordinator) The current KIP is really focused on REQUIREMENT 1 and I think that is reasonable as long as we don't come up with something that requires significant re-engineering to support the other requirements. REQUIREMENT 2: Provide consistency of configs across brokers (modulo per-broker overrides) or at least be able to verify consistency. What this effectively means is that config changes
Re: Review Request 33378: Patch for KAFKA-2136
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/ --- (Updated May 7, 2015, 1:32 a.m.) Review request for kafka and Joel Koshy. Bugs: KAFKA-2136 https://issues.apache.org/jira/browse/KAFKA-2136 Repository: kafka Description (updated) --- Changes are: - protocol changes to the fetch reuqest and response to return the throttle_time_ms to clients - New producer/consumer metrics to expose the avg and max delay time for a client - Test cases For now the patch will publish a zero delay and return a response Added more tests Addressing Jun's comments Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/api/FetchRequest.scala b038c15186c0cbcc65b59479324052498361b717 core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57fb76ec01660d93701a57ae953d877d81c core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da1d865086f9830aa919a49063abbbe574d core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639477bf66f9a05d2b9b07794572d7ec393b core/src/main/scala/kafka/server/AbstractFetcherThread.scala a439046e118b6efcc3a5a9d9e8acb79f85e40398 core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/server/KafkaApis.scala 417960dd1ab407ebebad8fdb0e97415db3e91a2f core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 5717165f2344823fabe8f7cfafae4bb8af2d949a core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala f3ab3f4ff8eb1aa6b2ab87ba75f72eceb6649620 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 Diff: https://reviews.apache.org/r/33378/diff/ Testing --- New tests added Thanks, Aditya Auradkar
Re: [DISCUSS] KIP-21 Configuration Management
This raises one more concern. Just putting it out there. 4. What if one decides to move to a different zk, I am sure there are other concerns in status quo for this question. However, this is adding to the concern, right? Not sure though how much big of a concern it practically is. On Wednesday, May 6, 2015, Ashish Singh asi...@cloudera.com wrote: Hey Jun, Where does the broker get the info, which zk it needs to talk to? On Wednesday, May 6, 2015, Jun Rao j...@confluent.io javascript:_e(%7B%7D,'cvml','j...@confluent.io'); wrote: Ashish, 3. Just want to clarify. Why can't you store ZK connection config in ZK? This is a property for ZK clients, not ZK server. Thanks, Jun On Wed, May 6, 2015 at 5:48 PM, Ashish Singh asi...@cloudera.com wrote: I too would like to share some concerns that we came up with while discussing the effect of moving configs to zookeeper will have. 1. Kafka will start to become a configuration management tool to some degree, and be subject to all the things such tools are commonly asked to do. Kafka'll likely need to re-implement the role / group / service hierarchy that CM uses. Kafka'll need some way to conveniently dump its configs so they can be re-imported later, as a backup tool. People will want this to be audited, which means you'd need distinct logins for different people, and user management. You can try to push some of this stuff onto tools like CM, but this is Kafka going out of its way to be difficult to manage, and most projects don't want to do that. Being unique in how configuration is done is strictly a bad thing for both integration and usability. Probably lots of other stuff. Seems like a bad direction. 2. Where would the default config live? If we decide on keeping the config files around just for getting the default config, then I think on restart, the config file will be ignored. This creates an obnoxious asymmetry for how to configure Kafka the first time and how you update it. You have to learn 2 ways of making config changes. If there was a mistake in your original config file, you can't just edit the config file and restart, you have to go through the API. Reading configs is also more irritating. This all creates a learning curve for users of Kafka that will make it harder to use than other projects. This is also a backwards-incompatible change. 3. All Kafka configs living in ZK is strictly impossible, since at the very least ZK connection configs cannot be stored in ZK. So you will have a file where some values are in effect but others are not, which is again confusing. Also, since you are still reading the config file on first start, there are still multiple sources of truth, or at least the appearance of such to the user. On Wed, May 6, 2015 at 5:33 PM, Jun Rao j...@confluent.io wrote: One of the Chef users confirmed that Chef integration could still work if all configs are moved to ZK. My rough understanding of how Chef works is that a user first registers a service host with a Chef server. After that, a Chef client will be run on the service host. The user can then push config changes intended for a service/host to the Chef server. The server is then responsible for pushing the changes to Chef clients. Chef clients support pluggable logic. For example, it can generate a config file that Kafka broker will take. If we move all configs to ZK, we can customize the Chef client to use our config CLI to make the config changes in Kafka. In this model, one probably doesn't need to register every broker in Chef for the config push. Not sure if Puppet works in a similar way. Also for storing the configs, we probably can't store the broker/global level configs in Kafka itself (e.g. in a special topic). The reason is that in order to start a broker, we likely need to make some broker level config changes (e.g., the default log.dir may not be present, the default port may not be available, etc). If we need a broker to be up to make those changes, we get into this chicken and egg problem. Thanks, Jun On Tue, May 5, 2015 at 4:14 PM, Gwen Shapira gshap...@cloudera.com wrote: Sorry I missed the call today :) I think an additional requirement would be: Make sure that traditional deployment tools (Puppet, Chef, etc) are still capable of managing Kafka configuration. For this reason, I'd like the configuration refresh to be pretty close to what most Linux services are doing to force a reload of configuration. AFAIK, this involves handling HUP signal in the main thread to reload configuration. Then packaging scripts can add something nice like service kafka reload. (See Apache web server: https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101) Gwen On Tue, May 5, 2015 at 8:54 AM,
Re: Review Request 33378: Patch for KAFKA-2136
On May 4, 2015, 4:51 p.m., Jun Rao wrote: core/src/main/scala/kafka/api/FetchResponse.scala, lines 152-153 https://reviews.apache.org/r/33378/diff/1/?file=937083#file937083line152 This is tricky since FetchRequest is used in the follower as well. When doing a rolling upgrade of the broker to 0.8.3, we have to follow the following steps. 1. Configure intra.cluster.protocol to 0.8.2 and rolling upgrade each broker to 0.8.3. After this step, each broker understands version 1 of the fetch request, but still sends fetch request in version 0. 2. Configure intra.cluster.protocol to 0.8.3 and restart each broker. After this step, every broker will start sending fetch request in version 1. So, we need the logic in ReplicaFetcherThread to issue the right version of the FetchRequest according to intra.cluster.protocol. We also need to read the response according to the request version (i.e., can't just assume the response is always on the latest version). Good point. I'm passing in the protocol version to use on the fetchRequest to the AbstractFetcherThread. Also, not reading the response based on the request version. - Aditya --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/#review82389 --- On May 7, 2015, 1:36 a.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/ --- (Updated May 7, 2015, 1:36 a.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2136 https://issues.apache.org/jira/browse/KAFKA-2136 Repository: kafka Description --- Changes are: - protocol changes to the fetch reuqest and response to return the throttle_time_ms to clients - New producer/consumer metrics to expose the avg and max delay time for a client - Test cases For now the patch will publish a zero delay and return a response Added more tests Addressing Jun's comments Formatting changes Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/api/FetchRequest.scala b038c15186c0cbcc65b59479324052498361b717 core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57fb76ec01660d93701a57ae953d877d81c core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da1d865086f9830aa919a49063abbbe574d core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639477bf66f9a05d2b9b07794572d7ec393b core/src/main/scala/kafka/server/AbstractFetcherThread.scala a439046e118b6efcc3a5a9d9e8acb79f85e40398 core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/server/KafkaApis.scala 417960dd1ab407ebebad8fdb0e97415db3e91a2f core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 5717165f2344823fabe8f7cfafae4bb8af2d949a core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala f3ab3f4ff8eb1aa6b2ab87ba75f72eceb6649620
Re: Need Access to Wiki Page To Create Page for Discussion
Bhavesh, I could not find Bmis13 when adding you to the wiki permission. Could you double check the account id? Guozhang On Wed, May 6, 2015 at 6:47 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jun, The account id is Bmis13. Thanks, Bhavesh On Wed, May 6, 2015 at 4:52 PM, Jun Rao j...@confluent.io wrote: What your wiki user id? Thanks, Jun On Wed, May 6, 2015 at 11:09 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi All, I need access to create Discussion or KIP document. Let me know what is process of getting access. Thanks, Bhavesh -- -- Guozhang
Re: [DISCUSS] KIP-21 Configuration Management
One of the Chef users confirmed that Chef integration could still work if all configs are moved to ZK. My rough understanding of how Chef works is that a user first registers a service host with a Chef server. After that, a Chef client will be run on the service host. The user can then push config changes intended for a service/host to the Chef server. The server is then responsible for pushing the changes to Chef clients. Chef clients support pluggable logic. For example, it can generate a config file that Kafka broker will take. If we move all configs to ZK, we can customize the Chef client to use our config CLI to make the config changes in Kafka. In this model, one probably doesn't need to register every broker in Chef for the config push. Not sure if Puppet works in a similar way. Also for storing the configs, we probably can't store the broker/global level configs in Kafka itself (e.g. in a special topic). The reason is that in order to start a broker, we likely need to make some broker level config changes (e.g., the default log.dir may not be present, the default port may not be available, etc). If we need a broker to be up to make those changes, we get into this chicken and egg problem. Thanks, Jun On Tue, May 5, 2015 at 4:14 PM, Gwen Shapira gshap...@cloudera.com wrote: Sorry I missed the call today :) I think an additional requirement would be: Make sure that traditional deployment tools (Puppet, Chef, etc) are still capable of managing Kafka configuration. For this reason, I'd like the configuration refresh to be pretty close to what most Linux services are doing to force a reload of configuration. AFAIK, this involves handling HUP signal in the main thread to reload configuration. Then packaging scripts can add something nice like service kafka reload. (See Apache web server: https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101) Gwen On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com wrote: Good discussion. Since we will be talking about this at 11am, I wanted to organize these comments into requirements to see if we are all on the same page. REQUIREMENT 1: Needs to accept dynamic config changes. This needs to be general enough to work for all configs that we envision may need to accept changes at runtime. e.g., log (topic), broker, client (quotas), etc.. possible options include: - ZooKeeper watcher - Kafka topic - Direct RPC to controller (or config coordinator) The current KIP is really focused on REQUIREMENT 1 and I think that is reasonable as long as we don't come up with something that requires significant re-engineering to support the other requirements. REQUIREMENT 2: Provide consistency of configs across brokers (modulo per-broker overrides) or at least be able to verify consistency. What this effectively means is that config changes must be seen by all brokers eventually and we should be able to easily compare the full config of each broker. REQUIREMENT 3: Central config store. Needs to work with plain file-based configs and other systems (e.g., puppet). Ideally, should not bring in other dependencies (e.g., a DB). Possible options: - ZooKeeper - Kafka topic - other? E.g. making it pluggable? Any other requirements? Thanks, Joel On Tue, May 05, 2015 at 01:38:09AM +, Aditya Auradkar wrote: Hey Neha, Thanks for the feedback. 1. In my earlier exchange with Jay, I mentioned the broker writing all it's configs to ZK (while respecting the overrides). Then ZK can be used to view all configs. 2. Need to think about this a bit more. Perhaps we can discuss this during the hangout tomorrow? 3 4) I viewed these config changes as mainly administrative operations. In the case, it may be reasonable to assume that the ZK port is available for communication from the machine these commands are run. Having a ConfigChangeRequest (or similar) is nice to have but having a new API and sending requests to controller also change how we do topic based configuration currently. I was hoping to keep this KIP as minimal as possible and provide a means to represent and modify client and broker based configs in a central place. Are there any concerns if we tackle these things in a later KIP? Thanks, Aditya From: Neha Narkhede [n...@confluent.io] Sent: Sunday, May 03, 2015 9:48 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-21 Configuration Management Thanks for starting this discussion, Aditya. Few questions/comments 1. If you change the default values like it's mentioned in the KIP, do you also overwrite the local config file as part of updating the default value? If not, where does the admin look to find the default values, ZK or local Kafka config file? What if a config value is different in both places? 2. I share Gwen's concern
Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer
Thanks for the review Joel. I agree don't need a init method we can use configure. I'll update the KIP. -Harsha On Wed, May 6, 2015, at 04:45 PM, Joel Koshy wrote: +1 with a minor comment: do we need an init method given it extends Configurable? Also, can you move this wiki out of drafts and add it to the table in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals? Thanks, Joel On Wed, May 06, 2015 at 07:46:46AM -0700, Sriharsha Chintalapani wrote: Thanks Jay. I removed partitioner.metadata from KIP. I’ll send an updated patch. -- Harsha Sent with Airmail On May 5, 2015 at 6:31:47 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) wrote: Thanks for the comments everyone. Hi Jay, I do have a question regarding configurable interface on how to pass a MapString, ? properties. I couldn’t find any other classes using it. JMX reporter overrides it but doesn’t implement it. So with configurable partitioner how can a user pass in partitioner configuration since its getting instantiated within the producer. Thanks, Harsha On May 4, 2015 at 10:36:45 AM, Jay Kreps (jay.kr...@gmail.com) wrote: Hey Harsha, That proposal sounds good. One minor thing--I don't think we need to have the partitioner.metadata property. Our reason for using string properties is exactly to make config extensible at runtime. So a given partitioner can add whatever properties make sense using the configure() api it defines. -Jay On Sun, May 3, 2015 at 5:57 PM, Harsha ka...@harsha.io wrote: Thanks Jay Gianmarco for the comments. I picked the option A, if user sends a partition id than it will applied and partitioner.class method will only called if partition id is null . Please take a look at the updated KIP here https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer . Let me know if you see anything missing. Thanks, Harsha On Fri, Apr 24, 2015, at 02:15 AM, Gianmarco De Francisci Morales wrote: Hi, Here are the questions I think we should consider: 1. Do we need this at all given that we have the partition argument in ProducerRecord which gives full control? I think we do need it because this is a way to plug in a different partitioning strategy at run time and do it in a fairly transparent way. Yes, we need it if we want to support different partitioning strategies inside Kafka rather than requiring the user to code them externally. 3. Do we need to add the value? I suspect people will have uses for computing something off a few fields in the value to choose the partition. This would be useful in cases where the key was being used for log compaction purposes and did not contain the full information for computing the partition. I am not entirely sure about this. I guess that most partitioners should not use it. I think it makes it easier to reason about the system if the partitioner only works on the key. Hoever, if the value (and its serialization) are already available, there is not much harm in passing them along. 4. This interface doesn't include either an init() or close() method. It should implement Closable and Configurable, right? Right now the only application I can think of to have an init() and close() is to read some state information (e.g., load information) that is published on some external distributed storage (e.g., zookeeper) by the brokers. It might be useful also for reconfiguration and state migration. I think it's not a very common use case right now, but if the added complexity is not too much it might be worth to have support for these methods. 5. What happens if the user both sets the partition id in the ProducerRecord and sets a partitioner? Does the partition id just get passed in to the partitioner (as sort of implied in this interface?). This is a bit weird since if you pass in the partition id you kind of expect it to get used, right? Or is it the case that if you specify a partition the partitioner isn't used at all (in which case no point in including partition in the Partitioner api). The user should be able to override the partitioner on a per-record basis by explicitly setting the partition id. I don't think it makes sense for the partitioners to take hints on the partition. I would even go the extra step, and have a default logic that accepts both key and partition id (current interface) and calls partition() only if the partition id is not set. The partition() method does *not* take the partition ID as input (only key-value). Cheers, -- Gianmarco
[jira] [Commented] (KAFKA-2136) Client side protocol changes to return quota delays
[ https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14531857#comment-14531857 ] Aditya A Auradkar commented on KAFKA-2136: -- Updated reviewboard https://reviews.apache.org/r/33378/diff/ against branch origin/trunk Client side protocol changes to return quota delays --- Key: KAFKA-2136 URL: https://issues.apache.org/jira/browse/KAFKA-2136 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch As described in KIP-13, evolve the protocol to return a throttle_time_ms in the Fetch and the ProduceResponse objects. Add client side metrics on the new producer and consumer to expose the delay time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2136) Client side protocol changes to return quota delays
[ https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya A Auradkar updated KAFKA-2136: - Attachment: KAFKA-2136_2015-05-06_18:32:48.patch Client side protocol changes to return quota delays --- Key: KAFKA-2136 URL: https://issues.apache.org/jira/browse/KAFKA-2136 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch As described in KIP-13, evolve the protocol to return a throttle_time_ms in the Fetch and the ProduceResponse objects. Add client side metrics on the new producer and consumer to expose the delay time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2136) Client side protocol changes to return quota delays
[ https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya A Auradkar updated KAFKA-2136: - Attachment: KAFKA-2136_2015-05-06_18:35:54.patch Client side protocol changes to return quota delays --- Key: KAFKA-2136 URL: https://issues.apache.org/jira/browse/KAFKA-2136 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch, KAFKA-2136_2015-05-06_18:35:54.patch As described in KIP-13, evolve the protocol to return a throttle_time_ms in the Fetch and the ProduceResponse objects. Add client side metrics on the new producer and consumer to expose the delay time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2136) Client side protocol changes to return quota delays
[ https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14531865#comment-14531865 ] Aditya A Auradkar commented on KAFKA-2136: -- Updated reviewboard https://reviews.apache.org/r/33378/diff/ against branch origin/trunk Client side protocol changes to return quota delays --- Key: KAFKA-2136 URL: https://issues.apache.org/jira/browse/KAFKA-2136 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch, KAFKA-2136_2015-05-06_18:35:54.patch As described in KIP-13, evolve the protocol to return a throttle_time_ms in the Fetch and the ProduceResponse objects. Add client side metrics on the new producer and consumer to expose the delay time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33378: Patch for KAFKA-2136
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/ --- (Updated May 7, 2015, 1:36 a.m.) Review request for kafka and Joel Koshy. Bugs: KAFKA-2136 https://issues.apache.org/jira/browse/KAFKA-2136 Repository: kafka Description (updated) --- Changes are: - protocol changes to the fetch reuqest and response to return the throttle_time_ms to clients - New producer/consumer metrics to expose the avg and max delay time for a client - Test cases For now the patch will publish a zero delay and return a response Added more tests Addressing Jun's comments Formatting changes Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/api/FetchRequest.scala b038c15186c0cbcc65b59479324052498361b717 core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57fb76ec01660d93701a57ae953d877d81c core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da1d865086f9830aa919a49063abbbe574d core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639477bf66f9a05d2b9b07794572d7ec393b core/src/main/scala/kafka/server/AbstractFetcherThread.scala a439046e118b6efcc3a5a9d9e8acb79f85e40398 core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda core/src/main/scala/kafka/server/KafkaApis.scala 417960dd1ab407ebebad8fdb0e97415db3e91a2f core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 5717165f2344823fabe8f7cfafae4bb8af2d949a core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala f3ab3f4ff8eb1aa6b2ab87ba75f72eceb6649620 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 Diff: https://reviews.apache.org/r/33378/diff/ Testing --- New tests added Thanks, Aditya Auradkar
Re: [DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.
Hey Guozhang, Good point about unify the handling of MetadataResponse and other ClientResponses. Thanks for all the feedbacks! I’ll take a shot and upload a patch for review. Jiangjie (Becket) Qin On 5/6/15, 2:19 PM, Guozhang Wang wangg...@gmail.com wrote: Jiangjie, Just trying to figure the class reference hierarchy of this approach (A - B means A either has a member variable of B or take B as API parameter). Metadata will have interface that takes in KafkaClient as a parameter, so Metadata - KafkaClient 1. For producer: KafkaProducer - Sender, KafkaProducer - Metadata, Sender - KafkaClient, Sender - Metadata 2. For consumer: KafkaConsumer - Coordinator, KafkaConsumer - Fetcher, KafkaConsumer - KafkaClient, KafkaConsumer - Metadata, Coordinator - KafkaClient, Coordinator - Metadata, Fetcher - KafkaClient, Fetcher - Metadata, 3. For controller: KafkaController - KafkaClient 4. For replica fetcher: ReplicaFetcher - KafkaClient For producer / consumer, the interleaving seems a bit complicated to me. Instead, we could completely remove the concept of Metadata from KafkaClient, such that NetworkClient.handleCompletedReceives does not specifically handle metadata responses, but just call response.request().callback().onComplete(response) as well, which will try to update the metadata and check for any errors. Guozhang On Wed, May 6, 2015 at 10:40 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: Jun Ewen, Thanks a lot for the comments. Both of them look better than my original plan. Jun, one downside about not changing NetworkClient but use a different metadata implementation is that NetworkClient will still have all the metadata related code there which makes it a little bit weird. I think Ewen¹s approach solve this problem. Ewen, If I understand correctly, you are proposing something similar to the structure we are using in AbstractFetcherThread/ConsumerFetcherThread/ReplicaFetcherThread. That will make NetworkClient look clean but increase the layers which is probably OK. Inspired by your suggestions. I have another thought which seems closer to Jun¹s idea. What if we move maybeUpdateMetadata()/handleMetadataResponse() and related logic in NetworkClient to metadata and pass in NetworkClient as an argument. Like Jun suggested, we need a Metadata interface and different implementations. Thanks. Jiangjie (Becket) Qin On 5/5/15, 11:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote: +1 on trying to reuse the NetworkClient code. I think Jun's approach could work, but I'm wondering if refactoring a bit could get better separation of concerns without a somewhat awkward nop implementation of Metadata. I'm not sure what combination of delegation or subclassing makes sense yet, but here's another approach that I think could work: * Get rid of metadata stuff from NetworkClient. Add a subclass that also manages all the metadata. (Since it's used for both producer and consumer, the obvious name that I first jumped to is ClientNetworkClient, but somehow I think we can come up with something less confusing.) * leastLoadedNode is the only caller of metadata.fetch() in that class, maybeUpdateMetadata is the only caller of leastLoadedNode, maybeUpdateMetadata is only called in poll when a combination of metadata related timeouts end up being 0. These can be safely refactored into the subclass with one override of poll(). Same with metadataFetchInProgress assuming the rest of the changes below. * Some of the default implementations (e.g. handleMetadataResponse) can be left nops in NetworkClient and moved to the subclass. * Others can be overridden to call the super method then take the additional action necessary (e.g., on disconnect, move the metadata update request to the subclass). * Making the timeout handling in poll() work for both NetworkClient and the new base class might be the messiest part and might require breaking down the implementation of poll into multiple methods. * isReady uses metadataFetchInProgress and gets a timeout from the Metadata class. We can just override this method as well, though I feel like there's probably a cleaner solution. -Ewen On Tue, May 5, 2015 at 4:54 PM, Jun Rao j...@confluent.io wrote: Hi, Jiangjie, Thanks for taking on this. I was thinking that one way to decouple the dependency on Metadata in NetworkClient is the following. 1. Make Metadata an interface. 2. Rename current Metadata class to sth like KafkaMetadata that implements the Metadata interface. 3. Have a new NoOpMetadata class that implements the Metadata interface. This class 3.1 does nothing for any write method 3.2 returns max long for any method that asks for a timestamp 3.3. returns an empty Cluster for fetch(). Then we can leave NetworkClient unchanged and just pass in a NoOpMetadata when using NetworkClient in the controller. The consumer/producer client will be using KafkaMetadata.
[jira] [Commented] (KAFKA-2172) Round-robin partition assignment strategy too restrictive
[ https://issues.apache.org/jira/browse/KAFKA-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14530767#comment-14530767 ] Bryan Baugher commented on KAFKA-2172: -- I originally asked out on the mailing list about this[1] but I'm also having troubles with the round robin partitioning because of its requirements. Similar to the above it makes deployments difficult as when changing our topic subscriptions the consumer group stops consuming messages. In our case our consumers are building their topic subscriptions from config they retrieve regularly from a REST service. Every consumer should have the same topic subscription except when the config changes and there some lag before all consumers retrieve the new config. Would you be open to a patch that provides another assignor which takes a simpler approach and just assigns each partition to a consumer interested in that topic with the least number of partitions assigned? This would not provide the optimal solution in the case where topic subscriptions are not equal but should generally do fine and should come up with the same answer as the round robin assignor when they are. [1] - http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3CCANZ-JHE6TRf%2BHdT-%3DK9AKFVXasLjg445cmcRVEBi5tG93XTNqA%40mail.gmail.com%3E Round-robin partition assignment strategy too restrictive - Key: KAFKA-2172 URL: https://issues.apache.org/jira/browse/KAFKA-2172 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg The round-ropin partition assignment strategy, was introduced for the high-level consumer, starting with 0.8.2.1. This appears to be a very attractive feature, but it has an unfortunate restriction, which prevents it from being easily utilized. That is that it requires all consumers in the consumer group have identical topic regex selectors, and that they have the same number of consumer threads. It turns out this is not always the case for our deployments. It's not unusual to run multiple consumers within a single process (with different topic selectors), or we might have multiple processes dedicated for different topic subsets. Agreed, we could change these to have separate group ids for each sub topic selector (but unfortunately, that's easier said than done). In several cases, we do at least have separate client.ids set for each sub-consumer, so it would be incrementally better if we could at least loosen the requirement such that each set of topics selected by a groupid/clientid pair are the same. But, if we want to do a rolling restart for a new version of a consumer config, the cluster will likely be in a state where it's not possible to have a single config until the full rolling restart completes across all nodes. This results in a consumer outage while the rolling restart is happening. Finally, it's especially problematic if we want to canary a new version for a period before rolling to the whole cluster. I'm not sure why this restriction should exist (as it obviously does not exist for the 'range' assignment strategy). It seems it could be made to work reasonably well with heterogenous topic selection and heterogenous thread counts. The documentation states that The round-robin partition assignor lays out all the available partitions and all the available consumer threads. It then proceeds to do a round-robin assignment from partition to consumer thread. If the assignor can lay out all the available partitions and all the available consumer threads, it should be able to uniformly assign partitions to the available threads. In each case, if a thread belongs to a consumer that doesn't have that partition selected, just move to the next available thread that does have the selection, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)