Re: [DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.

2015-05-06 Thread Ewen Cheslack-Postava
+1 on trying to reuse the NetworkClient code.

I think Jun's approach could work, but I'm wondering if refactoring a bit
could get better separation of concerns without a somewhat awkward nop
implementation of Metadata. I'm not sure what combination of delegation or
subclassing makes sense yet, but here's another approach that I think could
work:

* Get rid of metadata stuff from NetworkClient. Add a subclass that also
manages all the metadata. (Since it's used for both producer and consumer,
the obvious name that I first jumped to is ClientNetworkClient, but somehow
I think we can come up with something less confusing.)
* leastLoadedNode is the only caller of metadata.fetch() in that class,
maybeUpdateMetadata is the only caller of leastLoadedNode,
maybeUpdateMetadata is only called in poll when a combination of metadata
related timeouts end up being 0. These can be safely refactored into the
subclass with one override of poll(). Same with metadataFetchInProgress
assuming the rest of the changes below.
* Some of the default implementations (e.g. handleMetadataResponse) can be
left nops in NetworkClient and moved to the subclass.
* Others can be overridden to call the super method then take the
additional action necessary (e.g., on disconnect, move the metadata update
request to the subclass).
* Making the timeout handling in poll() work for both NetworkClient and the
new base class might be the messiest part and might require breaking down
the implementation of poll into multiple methods.
* isReady uses metadataFetchInProgress and gets a timeout from the Metadata
class. We can just override this method as well, though I feel like there's
probably a cleaner solution.

-Ewen


On Tue, May 5, 2015 at 4:54 PM, Jun Rao j...@confluent.io wrote:

 Hi, Jiangjie,

 Thanks for taking on this.

 I was thinking that one way to decouple the dependency on Metadata in
 NetworkClient is the following.
 1. Make Metadata an interface.
 2. Rename current Metadata class to sth like KafkaMetadata that implements
 the Metadata interface.
 3. Have a new NoOpMetadata class that implements the Metadata interface.
 This class
 3.1 does nothing for any write method
 3.2 returns max long for any method that asks for a timestamp
 3.3. returns an empty Cluster for fetch().

 Then we can leave NetworkClient unchanged and just pass in a NoOpMetadata
 when using NetworkClient in the controller. The consumer/producer client
 will be using KafkaMetadata.

 As for replica fetchers, it may be possible to use KafkaConsumer. However,
 we don't need the metadata and the offset management. So, perhaps it's
 easier to just use NetworkClient. Also, currently, there is one replica
 fetcher thread per source broker. By using NetworkClient, we can change
 that to using a single thread for all source brokers. This is probably a
 bigger change. So, maybe we can do it later.

 Jun


 I think we probably need to replace replica fetcher with NetworkClient as
 well. Replica fetcher gets leader from the controller and therefore doesn't

 On Tue, May 5, 2015 at 1:37 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  I am trying to see if we can reuse the NetworkClient class to be used in
  controller to broker communication. (Also, we can probably use
  KafkaConsumer which is already using NetworkClient in replica fetchers).
  Currently NetworkClient does the following things in addition to sending
  requests.
 
1.  Connection state management.
2.  Flow control (inflight requests)
3.  Metadata refresh
 
  In controller we need (1) and (2) but not (3). NetworkClient is tightly
  coupled with metadata now and this is the major blocker of reusing
  NetworkClient in controller.  For controller, we don’t need NetworkClient
  to manage any metadata because the controller has listeners to monitor
 the
  cluster state and has all the information about topic metadata.
  I am thinking we can add a disable metadata refresh flag to NetworkClient
  or set metadata refresh interval to be Long.MAX_VALUE, so the metadata
 will
  be managed outside NetworkClient.
  This needs minimal change to allow NetworkClient to be reused, but the
  ugly part is NetworkClient still has the entire Metadata while it
 actually
  only needs a NodeList.
 
  Want to see what do people think about this.
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
 




-- 
Thanks,
Ewen


[jira] [Created] (KAFKA-2173) Kafka died after throw more error

2015-05-06 Thread Truyet Nguyen (JIRA)
Truyet Nguyen created KAFKA-2173:


 Summary: Kafka died after throw more error
 Key: KAFKA-2173
 URL: https://issues.apache.org/jira/browse/KAFKA-2173
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2.1
 Environment: VPS Server CentOs 6.6 4G Ram
Reporter: Truyet Nguyen


Kafka is died after server.log throw more error: 

[2015-05-05 16:08:34,616] ERROR Closing socket for /127.0.0.1 because of error 
(kafka.network.Processor)
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at java.lang.Thread.run(Thread.java:745)





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2170) 10 LogTest cases failed for file.renameTo failed under windows

2015-05-06 Thread Honghai Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14530071#comment-14530071
 ] 

Honghai Chen commented on KAFKA-2170:
-

[~junrao]  this failure not related with OffsetIndex.resize() .  This issue is 
related with file.rename()  failure on windows if the file is opened.


For the offsetIndex.resize() issue,  after add log, I see the unmap has been 
triggered, but still get that exception.
Add below case to LogSegmentTest will get exception,  
java.io.IOException: The requested operation cannot be performed on a file w
ith a user-mapped section open
at java.io.RandomAccessFile.setLength(Native Method)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply$mcV$sp(OffsetIndex.scal
a:299)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:284)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:284)
at kafka.log.OffsetIndex.maybeLock(OffsetIndex.scala:406)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:284)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
ndex.scala:273)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:271)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:271)
at kafka.log.OffsetIndex.maybeLock(OffsetIndex.scala:406)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
at kafka.log.LogSegment.recover(LogSegment.scala:201)
at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash2(LogSegmentT
est.scala:304)

  def testCreateWithInitFileSizeCrash() {
val tempDir = TestUtils.tempDir()
val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime)
val ms = messages(50, hello, there)
seg.append(50, ms)
val ms2 = messages(60, alpha, beta)
seg.append(60, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
assertEquals(ms2.toList, read.messageSet.toList)
val oldSize = seg.log.sizeInBytes()
val oldPosition = seg.log.channel.position
val oldFileSize = seg.log.file.length
seg.flush()
seg.log.channel.close()
seg.index.close()

val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0,  SystemTime, true)
segReopen.recover(64*1024)
val size = segReopen.log.sizeInBytes()
val position = segReopen.log.channel.position
val fileSize = segReopen.log.file.length
assertEquals(oldPosition, position)
assertEquals(oldSize, size)
assertEquals(size, fileSize)
  }

 10 LogTest cases failed for  file.renameTo failed under windows
 ---

 Key: KAFKA-2170
 URL: https://issues.apache.org/jira/browse/KAFKA-2170
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.9.0
 Environment: Windows
Reporter: Honghai Chen
Assignee: Jay Kreps

 get latest code from trunk, then run test 
 gradlew  -i core:test --tests kafka.log.LogTest
 Got 10 cases failed for same reason:
 kafka.common.KafkaStorageException: Failed to change the log file suffix from 
  to .deleted for log segment 0
   at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:259)
   at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:756)
   at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:747)
   at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514)
   at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514)
   at scala.collection.immutable.List.foreach(List.scala:318)
   at kafka.log.Log.deleteOldSegments(Log.scala:514)
   at kafka.log.LogTest.testAsyncDelete(LogTest.scala:633)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:601)
   at 
 org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
   at 
 org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
   at 
 org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
   at 
 org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
   at 
 org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
   at 
 org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
   at 
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
   at 
 org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:44)
   at 

[jira] [Updated] (KAFKA-2174) Wrong TopicMetadata deserialization

2015-05-06 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2174:
-
Attachment: KAFKA-2174.patch

 Wrong TopicMetadata deserialization
 ---

 Key: KAFKA-2174
 URL: https://issues.apache.org/jira/browse/KAFKA-2174
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2174.patch


 TopicMetadata.readFrom assumes that ByteBuffer always contains the full set 
 of partitions but it is not true. On incomplete metadata we will get 
 java.lang.ArrayIndexOutOfBoundsException:
 {code}
 java.lang.ArrayIndexOutOfBoundsException: 47
 at 
 kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
 at 
 kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
 at 
 kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.Range.foreach(Range.scala:141)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
 {code}
 We sometimes get this exceptions on any broker restart (kill -TERM, 
 controlled.shutdown.enable=false).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2174) Wrong TopicMetadata deserialization

2015-05-06 Thread Alexey Ozeritskiy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Ozeritskiy updated KAFKA-2174:
-
Status: Patch Available  (was: Open)

 Wrong TopicMetadata deserialization
 ---

 Key: KAFKA-2174
 URL: https://issues.apache.org/jira/browse/KAFKA-2174
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy
 Attachments: KAFKA-2174.patch


 TopicMetadata.readFrom assumes that ByteBuffer always contains the full set 
 of partitions but it is not true. On incomplete metadata we will get 
 java.lang.ArrayIndexOutOfBoundsException:
 {code}
 java.lang.ArrayIndexOutOfBoundsException: 47
 at 
 kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
 at 
 kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
 at 
 kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.Range.foreach(Range.scala:141)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
 {code}
 We sometimes get this exceptions on any broker restart (kill -TERM, 
 controlled.shutdown.enable=false).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-05-06 Thread Sriharsha Chintalapani
Thanks Jay. I removed partitioner.metadata from KIP. I’ll send an updated patch.

-- 
Harsha
Sent with Airmail

On May 5, 2015 at 6:31:47 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) 
wrote:

Thanks for the comments everyone.
Hi Jay,
     I do have a question regarding configurable interface on how to pass a 
MapString, ? properties. I couldn’t find any other classes using it. JMX 
reporter overrides it but doesn’t implement it.  So with configurable 
partitioner how can a user pass in partitioner configuration since its getting 
instantiated within the producer.

Thanks,
Harsha


On May 4, 2015 at 10:36:45 AM, Jay Kreps (jay.kr...@gmail.com) wrote:

Hey Harsha,

That proposal sounds good. One minor thing--I don't think we need to have
the partitioner.metadata property. Our reason for using string properties
is exactly to make config extensible at runtime. So a given partitioner can
add whatever properties make sense using the configure() api it defines.

-Jay

On Sun, May 3, 2015 at 5:57 PM, Harsha ka...@harsha.io wrote:

 Thanks Jay  Gianmarco for the comments. I picked the option A, if user
 sends a partition id than it will applied and partitioner.class method
 will only called if partition id is null .
 Please take a look at the updated KIP here

 https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
 . Let me know if you see anything missing.

 Thanks,
 Harsha

 On Fri, Apr 24, 2015, at 02:15 AM, Gianmarco De Francisci Morales wrote:
  Hi,
 
 
  Here are the questions I think we should consider:
   1. Do we need this at all given that we have the partition argument in
   ProducerRecord which gives full control? I think we do need it because
 this
   is a way to plug in a different partitioning strategy at run time and
 do it
   in a fairly transparent way.
  
 
  Yes, we need it if we want to support different partitioning strategies
  inside Kafka rather than requiring the user to code them externally.
 
 
   3. Do we need to add the value? I suspect people will have uses for
   computing something off a few fields in the value to choose the
 partition.
   This would be useful in cases where the key was being used for log
   compaction purposes and did not contain the full information for
 computing
   the partition.
  
 
  I am not entirely sure about this. I guess that most partitioners should
  not use it.
  I think it makes it easier to reason about the system if the partitioner
  only works on the key.
  Hoever, if the value (and its serialization) are already available, there
  is not much harm in passing them along.
 
 
   4. This interface doesn't include either an init() or close() method.
 It
   should implement Closable and Configurable, right?
  
 
  Right now the only application I can think of to have an init() and
  close()
  is to read some state information (e.g., load information) that is
  published on some external distributed storage (e.g., zookeeper) by the
  brokers.
  It might be useful also for reconfiguration and state migration.
 
  I think it's not a very common use case right now, but if the added
  complexity is not too much it might be worth to have support for these
  methods.
 
 
 
   5. What happens if the user both sets the partition id in the
   ProducerRecord and sets a partitioner? Does the partition id just get
   passed in to the partitioner (as sort of implied in this interface?).
 This
   is a bit weird since if you pass in the partition id you kind of
 expect it
   to get used, right? Or is it the case that if you specify a partition
 the
   partitioner isn't used at all (in which case no point in including
   partition in the Partitioner api).
  
  
  The user should be able to override the partitioner on a per-record basis
  by explicitly setting the partition id.
  I don't think it makes sense for the partitioners to take hints on the
  partition.
 
  I would even go the extra step, and have a default logic that accepts
  both
  key and partition id (current interface) and calls partition() only if
  the
  partition id is not set. The partition() method does *not* take the
  partition ID as input (only key-value).
 
 
  Cheers,
  --
  Gianmarco
 
 
 
   Cheers,
  
   -Jay
  
   On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani 
 ka...@harsha.io
   wrote:
  
Hi,
Here is the KIP for adding a partitioner interface for
 producer.
   
   
  
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
There is one open question about how interface should look like.
 Please
take a look and let me know if you prefer one way or the other.
   
Thanks,
Harsha
   
   
  



[jira] [Created] (KAFKA-2174) Wrong TopicMetadata deserialization

2015-05-06 Thread Alexey Ozeritskiy (JIRA)
Alexey Ozeritskiy created KAFKA-2174:


 Summary: Wrong TopicMetadata deserialization
 Key: KAFKA-2174
 URL: https://issues.apache.org/jira/browse/KAFKA-2174
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Alexey Ozeritskiy


TopicMetadata.readFrom assumes that ByteBuffer always contains the full set of 
partitions but it is not true. On incomplete metadata we will get 
java.lang.ArrayIndexOutOfBoundsException:
{code}
java.lang.ArrayIndexOutOfBoundsException: 47
at 
kafka.api.TopicMetadata$$anonfun$readFrom$1.apply$mcVI$sp(TopicMetadata.scala:38)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at kafka.api.TopicMetadata$.readFrom(TopicMetadata.scala:36)
at 
kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
at 
kafka.api.TopicMetadataResponse$$anonfun$3.apply(TopicMetadataResponse.scala:31)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
kafka.api.TopicMetadataResponse$.readFrom(TopicMetadataResponse.scala:31)
{code}
We sometimes get this exceptions on any broker restart (kill -TERM, 
controlled.shutdown.enable=false).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-06 Thread Jiangjie Qin
Yes, that is the plan.

On 5/5/15, 8:23 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote:

Just a quick question, can we handle REQUEST TIMEOUT as disconnections and
do a fresh MetaDataRequest and retry instead of failing the request?


Thanks,

Mayuresh


On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I incorporated Ewen and Guozhang’s comments in the KIP page. Want to
speed
 up on this KIP because currently we experience mirror-maker hung very
 likely when a broker is down.

 I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata
 timeout to expire the batches which are sitting in accumulator without
 leader info. I did that because the situation there is essentially
missing
 metadata.

 As a summary of what I am thinking about the timeout in new Producer:

 1. Metadata timeout:
   - used in send(), blocking
   - used in accumulator to expire batches with timeout exception.
 2. Linger.ms
   - Used in accumulator to ready the batch for drain
 3. Request timeout
   - Used in NetworkClient to expire a batch and retry if no response is
 received for a request before timeout.

 So in this KIP, we only address (3). The only public interface change
is a
 new configuration of request timeout (and maybe change the configuration
 name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).

 Would like to see what people think of above approach?

 Jiangjie (Becket) Qin

 On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote:

 Jun,
 
 I thought a little bit differently on this.
 Intuitively, I am thinking that if a partition is offline, the metadata
 for that partition should be considered not ready because we don’t know
 which broker we should send the message to. So those sends need to be
 blocked on metadata timeout.
 Another thing I’m wondering is in which scenario an offline partition
will
 become online again in a short period of time and how likely it will
 occur. My understanding is that the batch timeout for batches sitting
in
 accumulator should be larger than linger.ms but should not be too long
 (e.g. less than 60 seconds). Otherwise it will exhaust the shared
buffer
 with batches to be aborted.
 
 That said, I do agree it is reasonable to buffer the message for some
time
 so messages to other partitions can still get sent. But adding another
 expiration in addition to linger.ms - which is essentially a timeout -
 sounds a little bit confusing. Maybe we can do this, let the batch sit
in
 accumulator up to linger.ms, then fail it if necessary.
 
 What do you think?
 
 Thanks,
 
 Jiangjie (Becket) Qin
 
 On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote:
 
 Jiangjie,
 
 Allowing messages to be accumulated in an offline partition could be
 useful
 since the partition may become available before the request timeout or
 linger time is reached. Now that we are planning to add a new
timeout, it
 would be useful to think through whether/how that applies to messages
in
 the accumulator too.
 
 Thanks,
 
 Jun
 
 
 On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin
j...@linkedin.com.invalid
 
 wrote:
 
  Hi Harsha,
 
  Took a quick look at the patch. I think it is still a little bit
  different. KAFKA-1788 only handles the case where a batch sitting in
  accumulator for too long. The KIP is trying to solve the issue
where a
  batch has already been drained from accumulator and sent to broker.
  We might be able to apply timeout on batch level to merge those two
 cases
  as Ewen suggested. But I’m not sure if it is a good idea to allow
 messages
  whose target partition is offline to sit in accumulator in the first
 place.
 
  Jiangjie (Becket) Qin
 
  On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io
 wrote:
 
  Guozhang and Jiangjie,
   Isn’t this work being covered in
  https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please
the
  review the patch there.
  Thanks,
  Harsha
  
  
  On April 15, 2015 at 10:39:40 PM, Guozhang Wang
(wangg...@gmail.com)
  wrote:
  
  Thanks for the update Jiangjie,
  
  I think it is actually NOT expected that hardware disconnection
will
 be
  detected by the selector, but rather will only be revealed upon TCP
  timeout, which could be hours.
  
  A couple of comments on the wiki:
  
  1. For KafkaProducer.close() and KafkaProducer.flush() we need the
  request
  timeout as implict timeout. I am not very clear what does this
mean?
  
  2. Currently the producer already has a TIMEOUT_CONFIG which
should
  really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
  REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming:
 admittedly
  
  it will change the config names but will reduce confusions moving
  forward.
  
  
  Guozhang
  
  
  On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
  wrote:
  
   Checked the code again. It seems that the disconnected channel is
 not
   detected by selector as expected.
  
   Currently we are depending on the
   

Review Request 33916: Patch for KAFKA-2163

2015-05-06 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33916/
---

Review request for kafka.


Bugs: KAFKA-2163
https://issues.apache.org/jira/browse/KAFKA-2163


Repository: kafka


Description
---

fix


renames and logging improvements


Diffs
-

  core/src/main/scala/kafka/cluster/Partition.scala 
122b1dbbe45cb27aed79b5be1e735fb617c716b0 
  core/src/main/scala/kafka/server/OffsetManager.scala 
18680ce100f10035175cc0263ba7787ab0f6a17a 

Diff: https://reviews.apache.org/r/33916/diff/


Testing
---


Thanks,

Joel Koshy



[jira] [Updated] (KAFKA-2163) Offsets manager cache should prevent stale-offset-cleanup while an offset load is in progress; otherwise we can lose consumer offsets

2015-05-06 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-2163:
--
Attachment: KAFKA-2163.patch

 Offsets manager cache should prevent stale-offset-cleanup while an offset 
 load is in progress; otherwise we can lose consumer offsets
 -

 Key: KAFKA-2163
 URL: https://issues.apache.org/jira/browse/KAFKA-2163
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
 Fix For: 0.8.3

 Attachments: KAFKA-2163.patch


 When leadership of an offsets partition moves, the new leader loads offsets 
 from that partition into the offset manager cache.
 Independently, the offset manager has a periodic cleanup task for stale 
 offsets that removes old offsets from the cache and appends tombstones for 
 those. If the partition happens to contain much older offsets (earlier in the 
 log) and inserts those into the cache; the cleanup task may run and see those 
 offsets (which it deems to be stale) and proceeds to remove from the cache 
 and append a tombstone to the end of the log. The tombstone will override the 
 true latest offset and a subsequent offset fetch request will return no 
 offset.
 We just need to prevent the cleanup task from running during an offset load.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2163) Offsets manager cache should prevent stale-offset-cleanup while an offset load is in progress; otherwise we can lose consumer offsets

2015-05-06 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-2163:
--
Assignee: Joel Koshy
  Status: Patch Available  (was: Open)

 Offsets manager cache should prevent stale-offset-cleanup while an offset 
 load is in progress; otherwise we can lose consumer offsets
 -

 Key: KAFKA-2163
 URL: https://issues.apache.org/jira/browse/KAFKA-2163
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
Assignee: Joel Koshy
 Fix For: 0.8.3

 Attachments: KAFKA-2163.patch


 When leadership of an offsets partition moves, the new leader loads offsets 
 from that partition into the offset manager cache.
 Independently, the offset manager has a periodic cleanup task for stale 
 offsets that removes old offsets from the cache and appends tombstones for 
 those. If the partition happens to contain much older offsets (earlier in the 
 log) and inserts those into the cache; the cleanup task may run and see those 
 offsets (which it deems to be stale) and proceeds to remove from the cache 
 and append a tombstone to the end of the log. The tombstone will override the 
 true latest offset and a subsequent offset fetch request will return no 
 offset.
 We just need to prevent the cleanup task from running during an offset load.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33065: Patch for KAFKA-1928

2015-05-06 Thread Jun Rao


 On April 18, 2015, 12:49 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/api/FetchResponse.scala, lines 66-72
  https://reviews.apache.org/r/33065/diff/1/?file=922627#file922627line66
 
  I am wondering if we need both completed() and remaining() in Send. It 
  seems that one of the two is enough for our usage.
  
  Also, not sure how useful reify() is. Currently, it's not used anywhere.
 
 Gwen Shapira wrote:
 reification is a very central concept in Kafka's The Trial :)
 
 I agree that reify() and remaining() can be removed without any pain. 
 Since it is a public API, do we need to deprecate them first?

Yes, we should we able to remove them since the api that's public is in 
kafka.javaapi.FetchResponse.


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review80557
---


On May 1, 2015, 10:45 p.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33065/
 ---
 
 (Updated May 1, 2015, 10:45 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1928
 https://issues.apache.org/jira/browse/KAFKA-1928
 
 
 Repository: kafka
 
 
 Description
 ---
 
 first pass on replacing Send
 
 
 implement maxSize and improved docs
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Conflicts:
   core/src/main/scala/kafka/network/RequestChannel.scala
 
 moved selector out of abstract thread
 
 
 mid-way through putting selector in SocketServer
 
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
 pass.
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-1928-v2
 
 
 renamed requestKey to connectionId to reflect new use and changed type from 
 Any to String
 
 
 Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
 as well
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
 da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
 1311f85847b022efec8cb05c450bb18231db6979 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  e55ab11df4db0b0084f841a74cbcf819caf780d5 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   
 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
 c8213e156ec9c9af49ee09f5238492318516aaa3 
   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
 PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
 fc0d168324aaebb97065b0aafbd547a1994d76a7 
   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
 68327cd3a734fd429966d3e2016a2488dbbb19e5 
   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
 b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
   clients/src/main/java/org/apache/kafka/common/network/Send.java 
 5d321a09e470166a1c33639cf0cab26a3bce98ec 
   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
 27cbf390c7f148ffa8c5abc154c72cbf0829715c 
   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
 PRE-CREATION 
   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
 5e3fab13e3c02eb351558ec973b949b3d1196085 
   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
 8b278892883e63899b53e15efb9d8c926131e858 
   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
 d5b306b026e788b4e5479f3419805aa49ae889f3 
   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
 ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
 

[jira] [Commented] (KAFKA-2150) FetcherThread backoff need to grab lock before wait on condition.

2015-05-06 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14531695#comment-14531695
 ] 

Sriharsha Chintalapani commented on KAFKA-2150:
---

[~guozhang] Can you please take a look at this patch . Thanks.

 FetcherThread backoff need to grab lock before wait on condition.
 -

 Key: KAFKA-2150
 URL: https://issues.apache.org/jira/browse/KAFKA-2150
 Project: Kafka
  Issue Type: Bug
Reporter: Jiangjie Qin
Assignee: Sriharsha Chintalapani
  Labels: newbie++
 Attachments: KAFKA-2150.patch, KAFKA-2150_2015-04-25_13:14:05.patch, 
 KAFKA-2150_2015-04-25_13:18:35.patch, KAFKA-2150_2015-04-25_13:35:36.patch


 Saw the following error: 
 kafka.api.ProducerBounceTest  testBrokerFailure STANDARD_OUT
 [2015-04-25 00:40:43,997] ERROR [ReplicaFetcherThread-0-0], Error due to  
 (kafka.server.ReplicaFetcherThread:103)
 java.lang.IllegalMonitorStateException
   at 
 java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:127)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1239)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1668)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2107)
   at 
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:95)
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 [2015-04-25 00:40:47,064] ERROR [ReplicaFetcherThread-0-1], Error due to  
 (kafka.server.ReplicaFetcherThread:103)
 java.lang.IllegalMonitorStateException
   at 
 java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:127)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1239)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(AbstractQueuedSynchronizer.java:1668)
   at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2107)
   at 
 kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:95)
   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
 We should grab the lock before waiting on the condition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-4 Admin Commands / Phase-1

2015-05-06 Thread Joe Stein
+1 (binding)

~ Joe Stein
- - - - - - - - - - - - - - - - -

  http://www.stealth.ly
- - - - - - - - - - - - - - - - -

On Tue, May 5, 2015 at 11:16 AM, Andrii Biletskyi 
andrii.bilets...@stealth.ly wrote:

 Hi all,

 This is a voting thread for KIP-4 Phase-1. It will include Wire protocol
 changes
 and server side handling code.


 https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations

 Thanks,
 Andrii Biletskyi



Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-05-06 Thread Joel Koshy
+1 with a minor comment: do we need an init method given it extends
Configurable?

Also, can you move this wiki out of drafts and add it to the table in
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals?

Thanks,

Joel

On Wed, May 06, 2015 at 07:46:46AM -0700, Sriharsha Chintalapani wrote:
 Thanks Jay. I removed partitioner.metadata from KIP. I’ll send an updated 
 patch.
 
 -- 
 Harsha
 Sent with Airmail
 
 On May 5, 2015 at 6:31:47 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) 
 wrote:
 
 Thanks for the comments everyone.
 Hi Jay,
      I do have a question regarding configurable interface on how to pass a 
 MapString, ? properties. I couldn’t find any other classes using it. JMX 
 reporter overrides it but doesn’t implement it.  So with configurable 
 partitioner how can a user pass in partitioner configuration since its 
 getting instantiated within the producer.
 
 Thanks,
 Harsha
 
 
 On May 4, 2015 at 10:36:45 AM, Jay Kreps (jay.kr...@gmail.com) wrote:
 
 Hey Harsha,
 
 That proposal sounds good. One minor thing--I don't think we need to have
 the partitioner.metadata property. Our reason for using string properties
 is exactly to make config extensible at runtime. So a given partitioner can
 add whatever properties make sense using the configure() api it defines.
 
 -Jay
 
 On Sun, May 3, 2015 at 5:57 PM, Harsha ka...@harsha.io wrote:
 
  Thanks Jay  Gianmarco for the comments. I picked the option A, if user
  sends a partition id than it will applied and partitioner.class method
  will only called if partition id is null .
  Please take a look at the updated KIP here
 
  https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
  . Let me know if you see anything missing.
 
  Thanks,
  Harsha
 
  On Fri, Apr 24, 2015, at 02:15 AM, Gianmarco De Francisci Morales wrote:
   Hi,
  
  
   Here are the questions I think we should consider:
1. Do we need this at all given that we have the partition argument in
ProducerRecord which gives full control? I think we do need it because
  this
is a way to plug in a different partitioning strategy at run time and
  do it
in a fairly transparent way.
   
  
   Yes, we need it if we want to support different partitioning strategies
   inside Kafka rather than requiring the user to code them externally.
  
  
3. Do we need to add the value? I suspect people will have uses for
computing something off a few fields in the value to choose the
  partition.
This would be useful in cases where the key was being used for log
compaction purposes and did not contain the full information for
  computing
the partition.
   
  
   I am not entirely sure about this. I guess that most partitioners should
   not use it.
   I think it makes it easier to reason about the system if the partitioner
   only works on the key.
   Hoever, if the value (and its serialization) are already available, there
   is not much harm in passing them along.
  
  
4. This interface doesn't include either an init() or close() method.
  It
should implement Closable and Configurable, right?
   
  
   Right now the only application I can think of to have an init() and
   close()
   is to read some state information (e.g., load information) that is
   published on some external distributed storage (e.g., zookeeper) by the
   brokers.
   It might be useful also for reconfiguration and state migration.
  
   I think it's not a very common use case right now, but if the added
   complexity is not too much it might be worth to have support for these
   methods.
  
  
  
5. What happens if the user both sets the partition id in the
ProducerRecord and sets a partitioner? Does the partition id just get
passed in to the partitioner (as sort of implied in this interface?).
  This
is a bit weird since if you pass in the partition id you kind of
  expect it
to get used, right? Or is it the case that if you specify a partition
  the
partitioner isn't used at all (in which case no point in including
partition in the Partitioner api).
   
   
   The user should be able to override the partitioner on a per-record basis
   by explicitly setting the partition id.
   I don't think it makes sense for the partitioners to take hints on the
   partition.
  
   I would even go the extra step, and have a default logic that accepts
   both
   key and partition id (current interface) and calls partition() only if
   the
   partition id is not set. The partition() method does *not* take the
   partition ID as input (only key-value).
  
  
   Cheers,
   --
   Gianmarco
  
  
  
Cheers,
   
-Jay
   
On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani 
  ka...@harsha.io
wrote:
   
 Hi,
 Here is the KIP for adding a partitioner interface for
  producer.


   
  

Re: Need Access to Wiki Page To Create Page for Discussion

2015-05-06 Thread Jun Rao
What your wiki user id?

Thanks,

Jun

On Wed, May 6, 2015 at 11:09 AM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 Hi All,

 I need access to create Discussion or KIP document.  Let me know what is
 process of getting access.

 Thanks,

 Bhavesh



[jira] [Commented] (KAFKA-2160) DelayedOperationPurgatory should remove the pair in watchersForKey with empty watchers list

2015-05-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14531672#comment-14531672
 ] 

Guozhang Wang commented on KAFKA-2160:
--

Updated reviewboard https://reviews.apache.org/r/33731/diff/
 against branch origin/trunk

 DelayedOperationPurgatory should remove the pair in watchersForKey with empty 
 watchers list
 ---

 Key: KAFKA-2160
 URL: https://issues.apache.org/jira/browse/KAFKA-2160
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Attachments: KAFKA-2160.patch, KAFKA-2160_2015-04-30_15:20:14.patch, 
 KAFKA-2160_2015-05-06_16:31:48.patch


 With purgatory usage in consumer coordinator, it will be common that watcher 
 lists are very short and live only for a short time. So we'd better clean 
 them from the watchersForKey Pool once the list become empty in 
 checkAndComplete() calls. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33731: Second Attempt to Fix KAFKA-2160

2015-05-06 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33731/
---

(Updated May 6, 2015, 11:31 p.m.)


Review request for kafka.


Summary (updated)
-

Second Attempt to Fix KAFKA-2160


Bugs: KAFKA-2160
https://issues.apache.org/jira/browse/KAFKA-2160


Repository: kafka


Description (updated)
---

Add a per-key lock in Pool which is used for non-primitive UpdateAndMaybeRemove 
function


Diffs (updated)
-

  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
aa8d9404a3e78a365df06404b79d0d8f694b4bd6 
  core/src/main/scala/kafka/server/DelayedOperation.scala 
2ed9b467c2865e5717d7f6fd933cd09a5c5b22c0 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
05078b24ef28f2f4e099afa943e43f1d00359fda 
  core/src/main/scala/kafka/utils/Pool.scala 
9ddcde797341ddd961923cafca16472d84417b5a 
  core/src/main/scala/kafka/utils/timer/Timer.scala 
b8cde820a770a4e894804f1c268b24b529940650 
  core/src/test/scala/unit/kafka/log/LogConfigTest.scala 
f3546adee490891e0d8d0214bef00b1dd7f42227 
  core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
f3ab3f4ff8eb1aa6b2ab87ba75f72eceb6649620 

Diff: https://reviews.apache.org/r/33731/diff/


Testing
---


Thanks,

Guozhang Wang



[jira] [Updated] (KAFKA-2160) DelayedOperationPurgatory should remove the pair in watchersForKey with empty watchers list

2015-05-06 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2160:
-
Attachment: KAFKA-2160_2015-05-06_16:31:48.patch

 DelayedOperationPurgatory should remove the pair in watchersForKey with empty 
 watchers list
 ---

 Key: KAFKA-2160
 URL: https://issues.apache.org/jira/browse/KAFKA-2160
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Attachments: KAFKA-2160.patch, KAFKA-2160_2015-04-30_15:20:14.patch, 
 KAFKA-2160_2015-05-06_16:31:48.patch


 With purgatory usage in consumer coordinator, it will be common that watcher 
 lists are very short and live only for a short time. So we'd better clean 
 them from the watchersForKey Pool once the list become empty in 
 checkAndComplete() calls. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2163) Offsets manager cache should prevent stale-offset-cleanup while an offset load is in progress; otherwise we can lose consumer offsets

2015-05-06 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14531529#comment-14531529
 ] 

Joel Koshy commented on KAFKA-2163:
---

Created reviewboard https://reviews.apache.org/r/33916/diff/
 against branch origin/trunk

 Offsets manager cache should prevent stale-offset-cleanup while an offset 
 load is in progress; otherwise we can lose consumer offsets
 -

 Key: KAFKA-2163
 URL: https://issues.apache.org/jira/browse/KAFKA-2163
 Project: Kafka
  Issue Type: Bug
Reporter: Joel Koshy
 Fix For: 0.8.3

 Attachments: KAFKA-2163.patch


 When leadership of an offsets partition moves, the new leader loads offsets 
 from that partition into the offset manager cache.
 Independently, the offset manager has a periodic cleanup task for stale 
 offsets that removes old offsets from the cache and appends tombstones for 
 those. If the partition happens to contain much older offsets (earlier in the 
 log) and inserts those into the cache; the cleanup task may run and see those 
 offsets (which it deems to be stale) and proceeds to remove from the cache 
 and append a tombstone to the end of the log. The tombstone will override the 
 true latest offset and a subsequent offset fetch request will return no 
 offset.
 We just need to prevent the cleanup task from running during an offset load.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33065: Patch for KAFKA-1928

2015-05-06 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82625
---


Thanks for the patch. Looks great overall! Some comments below.

Also, there seems to be compilation error when running tests.


clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
https://reviews.apache.org/r/33065/#comment133525

Instead of doing the id to string conversion every time, perhaps we can 
compute the string of the id once in Node and expose it through an idString() 
method?



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
https://reviews.apache.org/r/33065/#comment133526

Perhaps changing nodeId to node.



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
https://reviews.apache.org/r/33065/#comment133527

Perhaps changing id to node?



clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
https://reviews.apache.org/r/33065/#comment133529

Since we always use a 4 byte size, perhaps we should make both remaining() 
and writeTo() return int?



clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
https://reviews.apache.org/r/33065/#comment133564

Not sure why we need to pass in expectedBytesToWrite. This can be computed 
from the sends.



clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
https://reviews.apache.org/r/33065/#comment133532

Instead of moving back the cursor in the iterator, would it be simpler to 
maintain a current Send that's being iterated on?



clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
https://reviews.apache.org/r/33065/#comment133534

request size probably should be renamed to receive size since it can be for 
either request or response.



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133512

There is a followup patch in KAFKA-1282 that we haven't incorporated. 
Bascially, the linked hash map needs to be in access order. Since we are moving 
code around, could you incorporate the change here?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133537

Instead of -1, perhaps we can reuse NetworkReceive.UNLIMITED?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133541

Could we just iterate the keySet directly instead of making a copy first?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133543

It's reasonable to have a size for Send. The server size only records sent 
bytes, instead of messages.



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133544

In this case, we should probably just propagate the exception and 
potentially kill the caller, instead of continue, right?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
https://reviews.apache.org/r/33065/#comment133548

Perhaps we could pass in connectionMaxIdleNanos to the constructor of 
Selector. Then this method can be private. For clients, we can default to max 
long so that they could close idle connections since the broker is doing that. 
The broker can pass in the connectionMaxIdleNanos from config.



core/src/main/scala/kafka/network/RequestChannel.scala
https://reviews.apache.org/r/33065/#comment133506

We probably don't need remoteAddress any more since that's part of 
connectionId now.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment133395

Could we define both methods as override?



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment133567

Currently, Selector exposes metrics per connection. While this works fine 
for producer/consumer clients since the number of brokers is typically small, I 
am concerned about having those on the server since there could be 10s of 
thousands of connections in a broker.

Perhaps we can pass in a config to disable per node metric in Selector when 
used in the broker.

We probably should think through if there is any metrics in Selector that 
we want to expose in Coda Hale metrics.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment133500

It seems that we don't need startSelectTime. We will need to expose the 
io-wait-ratio metric in selector to networkProcessor.IdlePercent. This can be 
done in a followup jira though if the patch is too big.



core/src/main/scala/kafka/network/SocketServer.scala
https://reviews.apache.org/r/33065/#comment133495

No need for return value.



core/src/main/scala/kafka/network/SocketServer.scala

[jira] [Updated] (KAFKA-2175) Reduce server log verbosity at info level

2015-05-06 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino updated KAFKA-2175:
---
Status: Patch Available  (was: Open)

 Reduce server log verbosity at info level
 -

 Key: KAFKA-2175
 URL: https://issues.apache.org/jira/browse/KAFKA-2175
 Project: Kafka
  Issue Type: Improvement
  Components: controller, zkclient
Affects Versions: 0.8.3
Reporter: Todd Palino
Assignee: Todd Palino
Priority: Minor
  Labels: newbie
 Attachments: KAFKA-2175.patch


 Currently, the broker logs two messages at INFO level that should be at a 
 lower level. This serves only to fill up log files on disk, and can cause 
 performance issues due to synchronous logging as well.
 The first is the Closing socket connection message when there is no error. 
 This should be reduced to debug level. The second is the message that ZkUtil 
 writes when updating the partition reassignment JSON. This message contains 
 the entire JSON blob and should never be written at info level. In addition, 
 there is already a message in the controller log stating that the ZK node has 
 been updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2175) Reduce server log verbosity at info level

2015-05-06 Thread Todd Palino (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Todd Palino reassigned KAFKA-2175:
--

Assignee: Todd Palino  (was: Neha Narkhede)

 Reduce server log verbosity at info level
 -

 Key: KAFKA-2175
 URL: https://issues.apache.org/jira/browse/KAFKA-2175
 Project: Kafka
  Issue Type: Improvement
  Components: controller, zkclient
Affects Versions: 0.8.3
Reporter: Todd Palino
Assignee: Todd Palino
Priority: Minor
  Labels: newbie
 Attachments: KAFKA-2175.patch


 Currently, the broker logs two messages at INFO level that should be at a 
 lower level. This serves only to fill up log files on disk, and can cause 
 performance issues due to synchronous logging as well.
 The first is the Closing socket connection message when there is no error. 
 This should be reduced to debug level. The second is the message that ZkUtil 
 writes when updating the partition reassignment JSON. This message contains 
 the entire JSON blob and should never be written at info level. In addition, 
 there is already a message in the controller log stating that the ZK node has 
 been updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2175) Reduce server log verbosity at info level

2015-05-06 Thread Todd Palino (JIRA)
Todd Palino created KAFKA-2175:
--

 Summary: Reduce server log verbosity at info level
 Key: KAFKA-2175
 URL: https://issues.apache.org/jira/browse/KAFKA-2175
 Project: Kafka
  Issue Type: Improvement
  Components: controller, zkclient
Affects Versions: 0.8.3
Reporter: Todd Palino
Assignee: Neha Narkhede
Priority: Minor


Currently, the broker logs two messages at INFO level that should be at a lower 
level. This serves only to fill up log files on disk, and can cause performance 
issues due to synchronous logging as well.

The first is the Closing socket connection message when there is no error. 
This should be reduced to debug level. The second is the message that ZkUtil 
writes when updating the partition reassignment JSON. This message contains the 
entire JSON blob and should never be written at info level. In addition, there 
is already a message in the controller log stating that the ZK node has been 
updated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.

2015-05-06 Thread Guozhang Wang
Jiangjie,

Just trying to figure the class reference hierarchy of this approach (A -
B means A either has a member variable of B or take B as API parameter).

Metadata will have interface that takes in KafkaClient as a parameter, so
Metadata - KafkaClient

1. For producer:

KafkaProducer - Sender, KafkaProducer - Metadata, Sender - KafkaClient,
Sender - Metadata

2. For consumer:

KafkaConsumer - Coordinator, KafkaConsumer - Fetcher, KafkaConsumer -
KafkaClient, KafkaConsumer - Metadata, Coordinator - KafkaClient,
Coordinator - Metadata, Fetcher - KafkaClient, Fetcher - Metadata,

3. For controller:

KafkaController - KafkaClient

4. For replica fetcher:

ReplicaFetcher - KafkaClient

For producer / consumer, the interleaving seems a bit complicated to me.
Instead, we could completely remove the concept of Metadata from
KafkaClient, such that NetworkClient.handleCompletedReceives does not
specifically handle metadata responses, but just call
response.request().callback().onComplete(response) as well, which will try
to update the metadata and check for any errors.

Guozhang

On Wed, May 6, 2015 at 10:40 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Jun  Ewen,

 Thanks a lot for the comments. Both of them look better than my original
 plan.

 Jun, one downside about not changing NetworkClient but use a different
 metadata implementation is that NetworkClient will still have all the
 metadata related code there which makes it a little bit weird. I think
 Ewen¹s approach solve this problem.

 Ewen, If I understand correctly, you are proposing something similar to
 the structure we are using in
 AbstractFetcherThread/ConsumerFetcherThread/ReplicaFetcherThread. That
 will make NetworkClient look clean but increase the layers which is
 probably OK.

 Inspired by your suggestions. I have another thought which seems closer to
 Jun¹s idea. What if we move maybeUpdateMetadata()/handleMetadataResponse()
 and related logic in NetworkClient to metadata and pass in NetworkClient
 as an argument. Like Jun suggested, we need a Metadata interface and
 different implementations.

 Thanks.

 Jiangjie (Becket) Qin



 On 5/5/15, 11:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 +1 on trying to reuse the NetworkClient code.
 
 I think Jun's approach could work, but I'm wondering if refactoring a bit
 could get better separation of concerns without a somewhat awkward nop
 implementation of Metadata. I'm not sure what combination of delegation or
 subclassing makes sense yet, but here's another approach that I think
 could
 work:
 
 * Get rid of metadata stuff from NetworkClient. Add a subclass that also
 manages all the metadata. (Since it's used for both producer and consumer,
 the obvious name that I first jumped to is ClientNetworkClient, but
 somehow
 I think we can come up with something less confusing.)
 * leastLoadedNode is the only caller of metadata.fetch() in that class,
 maybeUpdateMetadata is the only caller of leastLoadedNode,
 maybeUpdateMetadata is only called in poll when a combination of metadata
 related timeouts end up being 0. These can be safely refactored into the
 subclass with one override of poll(). Same with metadataFetchInProgress
 assuming the rest of the changes below.
 * Some of the default implementations (e.g. handleMetadataResponse) can be
 left nops in NetworkClient and moved to the subclass.
 * Others can be overridden to call the super method then take the
 additional action necessary (e.g., on disconnect, move the metadata update
 request to the subclass).
 * Making the timeout handling in poll() work for both NetworkClient and
 the
 new base class might be the messiest part and might require breaking down
 the implementation of poll into multiple methods.
 * isReady uses metadataFetchInProgress and gets a timeout from the
 Metadata
 class. We can just override this method as well, though I feel like
 there's
 probably a cleaner solution.
 
 -Ewen
 
 
 On Tue, May 5, 2015 at 4:54 PM, Jun Rao j...@confluent.io wrote:
 
  Hi, Jiangjie,
 
  Thanks for taking on this.
 
  I was thinking that one way to decouple the dependency on Metadata in
  NetworkClient is the following.
  1. Make Metadata an interface.
  2. Rename current Metadata class to sth like KafkaMetadata that
 implements
  the Metadata interface.
  3. Have a new NoOpMetadata class that implements the Metadata interface.
  This class
  3.1 does nothing for any write method
  3.2 returns max long for any method that asks for a timestamp
  3.3. returns an empty Cluster for fetch().
 
  Then we can leave NetworkClient unchanged and just pass in a
 NoOpMetadata
  when using NetworkClient in the controller. The consumer/producer client
  will be using KafkaMetadata.
 
  As for replica fetchers, it may be possible to use KafkaConsumer.
 However,
  we don't need the metadata and the offset management. So, perhaps it's
  easier to just use NetworkClient. Also, currently, there is one replica
  fetcher thread per source broker. 

Re: [DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.

2015-05-06 Thread Ewen Cheslack-Postava
On Wed, May 6, 2015 at 10:40 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Jun  Ewen,

 Thanks a lot for the comments. Both of them look better than my original
 plan.

 Jun, one downside about not changing NetworkClient but use a different
 metadata implementation is that NetworkClient will still have all the
 metadata related code there which makes it a little bit weird. I think
 Ewen¹s approach solve this problem.

 Ewen, If I understand correctly, you are proposing something similar to
 the structure we are using in
 AbstractFetcherThread/ConsumerFetcherThread/ReplicaFetcherThread. That
 will make NetworkClient look clean but increase the layers which is
 probably OK.


Yes, same basic idea.



 Inspired by your suggestions. I have another thought which seems closer to
 Jun¹s idea. What if we move maybeUpdateMetadata()/handleMetadataResponse()
 and related logic in NetworkClient to metadata and pass in NetworkClient
 as an argument. Like Jun suggested, we need a Metadata interface and
 different implementations.


This could work too. I think it ends up with the new Metadata interface and
NetworkClient being more coupled than layered. What I was hoping to do was
to remove the concept of Metadata entirely from NetworkClient so it only
deals with Nodes and is focused on the request/response handling. But that
might require introducing a listener interface (if Metadata was refactored
to be passed a NetworkClient it would use to initiate metadata refreshes)
or provide the right hooks so that behavior can be overridden (if Metadata
was to subclass NetworkClient).

I think all these options could work and it's not clear to me which will
work out best in separating the code, so I'd just try one of them and see
how the patch looks.

-Ewen



 Thanks.

 Jiangjie (Becket) Qin



 On 5/5/15, 11:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 +1 on trying to reuse the NetworkClient code.
 
 I think Jun's approach could work, but I'm wondering if refactoring a bit
 could get better separation of concerns without a somewhat awkward nop
 implementation of Metadata. I'm not sure what combination of delegation or
 subclassing makes sense yet, but here's another approach that I think
 could
 work:
 
 * Get rid of metadata stuff from NetworkClient. Add a subclass that also
 manages all the metadata. (Since it's used for both producer and consumer,
 the obvious name that I first jumped to is ClientNetworkClient, but
 somehow
 I think we can come up with something less confusing.)
 * leastLoadedNode is the only caller of metadata.fetch() in that class,
 maybeUpdateMetadata is the only caller of leastLoadedNode,
 maybeUpdateMetadata is only called in poll when a combination of metadata
 related timeouts end up being 0. These can be safely refactored into the
 subclass with one override of poll(). Same with metadataFetchInProgress
 assuming the rest of the changes below.
 * Some of the default implementations (e.g. handleMetadataResponse) can be
 left nops in NetworkClient and moved to the subclass.
 * Others can be overridden to call the super method then take the
 additional action necessary (e.g., on disconnect, move the metadata update
 request to the subclass).
 * Making the timeout handling in poll() work for both NetworkClient and
 the
 new base class might be the messiest part and might require breaking down
 the implementation of poll into multiple methods.
 * isReady uses metadataFetchInProgress and gets a timeout from the
 Metadata
 class. We can just override this method as well, though I feel like
 there's
 probably a cleaner solution.
 
 -Ewen
 
 
 On Tue, May 5, 2015 at 4:54 PM, Jun Rao j...@confluent.io wrote:
 
  Hi, Jiangjie,
 
  Thanks for taking on this.
 
  I was thinking that one way to decouple the dependency on Metadata in
  NetworkClient is the following.
  1. Make Metadata an interface.
  2. Rename current Metadata class to sth like KafkaMetadata that
 implements
  the Metadata interface.
  3. Have a new NoOpMetadata class that implements the Metadata interface.
  This class
  3.1 does nothing for any write method
  3.2 returns max long for any method that asks for a timestamp
  3.3. returns an empty Cluster for fetch().
 
  Then we can leave NetworkClient unchanged and just pass in a
 NoOpMetadata
  when using NetworkClient in the controller. The consumer/producer client
  will be using KafkaMetadata.
 
  As for replica fetchers, it may be possible to use KafkaConsumer.
 However,
  we don't need the metadata and the offset management. So, perhaps it's
  easier to just use NetworkClient. Also, currently, there is one replica
  fetcher thread per source broker. By using NetworkClient, we can change
  that to using a single thread for all source brokers. This is probably a
  bigger change. So, maybe we can do it later.
 
  Jun
 
 
  I think we probably need to replace replica fetcher with NetworkClient
 as
  well. Replica fetcher gets leader from the controller and therefore
 doesn't
 
 

Need Access to Wiki Page To Create Page for Discussion

2015-05-06 Thread Bhavesh Mistry
Hi All,

I need access to create Discussion or KIP document.  Let me know what is
process of getting access.

Thanks,

Bhavesh


[jira] [Resolved] (KAFKA-2027) kafka never notifies the zookeeper client when a partition moved with due to an auto-rebalance (when auto.leader.rebalance.enable=true)

2015-05-06 Thread Sampath Reddy Lambu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sampath Reddy Lambu resolved KAFKA-2027.

Resolution: Invalid

 kafka never notifies the zookeeper client when a partition moved with due to 
 an auto-rebalance (when auto.leader.rebalance.enable=true)
 ---

 Key: KAFKA-2027
 URL: https://issues.apache.org/jira/browse/KAFKA-2027
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.8.1.1
 Environment: Kafka 0.8.1.1, Node.js  Mac OS
Reporter: Sampath Reddy Lambu
Assignee: Neha Narkhede
Priority: Blocker

 I would like report an issue when auto.leader.rebalance.enable=true. Kafka 
 never sends an event/notification to its zookeeper client after preferred 
 election complete. This works fine with manual rebalance from CLI 
 (kafka-preferred-replica-election.sh).
 Initially i thought this issue was with Kafka-Node, but its not. 
 An event should be emitted from zookeeper if any partition moved while 
 preferred election.
 Im working with kafka_2.9.2-0.8.1.1 (Broker's)  Kafka-Node(Node.JS).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (KAFKA-2027) kafka never notifies the zookeeper client when a partition moved with due to an auto-rebalance (when auto.leader.rebalance.enable=true)

2015-05-06 Thread Sampath Reddy Lambu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sampath Reddy Lambu closed KAFKA-2027.
--

 kafka never notifies the zookeeper client when a partition moved with due to 
 an auto-rebalance (when auto.leader.rebalance.enable=true)
 ---

 Key: KAFKA-2027
 URL: https://issues.apache.org/jira/browse/KAFKA-2027
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.8.1.1
 Environment: Kafka 0.8.1.1, Node.js  Mac OS
Reporter: Sampath Reddy Lambu
Assignee: Neha Narkhede
Priority: Blocker

 I would like report an issue when auto.leader.rebalance.enable=true. Kafka 
 never sends an event/notification to its zookeeper client after preferred 
 election complete. This works fine with manual rebalance from CLI 
 (kafka-preferred-replica-election.sh).
 Initially i thought this issue was with Kafka-Node, but its not. 
 An event should be emitted from zookeeper if any partition moved while 
 preferred election.
 Im working with kafka_2.9.2-0.8.1.1 (Broker's)  Kafka-Node(Node.JS).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2027) kafka never notifies the zookeeper client when a partition moved with due to an auto-rebalance (when auto.leader.rebalance.enable=true)

2015-05-06 Thread Sampath Reddy Lambu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14531073#comment-14531073
 ] 

Sampath Reddy Lambu commented on KAFKA-2027:


this was handled in Kafka-Node project in 0.2.25 version. closing this issue.
Thank you for your response.

-Sampath

 kafka never notifies the zookeeper client when a partition moved with due to 
 an auto-rebalance (when auto.leader.rebalance.enable=true)
 ---

 Key: KAFKA-2027
 URL: https://issues.apache.org/jira/browse/KAFKA-2027
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.8.1.1
 Environment: Kafka 0.8.1.1, Node.js  Mac OS
Reporter: Sampath Reddy Lambu
Assignee: Neha Narkhede
Priority: Blocker

 I would like report an issue when auto.leader.rebalance.enable=true. Kafka 
 never sends an event/notification to its zookeeper client after preferred 
 election complete. This works fine with manual rebalance from CLI 
 (kafka-preferred-replica-election.sh).
 Initially i thought this issue was with Kafka-Node, but its not. 
 An event should be emitted from zookeeper if any partition moved while 
 preferred election.
 Im working with kafka_2.9.2-0.8.1.1 (Broker's)  Kafka-Node(Node.JS).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.

2015-05-06 Thread Jiangjie Qin
Jun  Ewen,

Thanks a lot for the comments. Both of them look better than my original
plan.

Jun, one downside about not changing NetworkClient but use a different
metadata implementation is that NetworkClient will still have all the
metadata related code there which makes it a little bit weird. I think
Ewen¹s approach solve this problem.

Ewen, If I understand correctly, you are proposing something similar to
the structure we are using in
AbstractFetcherThread/ConsumerFetcherThread/ReplicaFetcherThread. That
will make NetworkClient look clean but increase the layers which is
probably OK.

Inspired by your suggestions. I have another thought which seems closer to
Jun¹s idea. What if we move maybeUpdateMetadata()/handleMetadataResponse()
and related logic in NetworkClient to metadata and pass in NetworkClient
as an argument. Like Jun suggested, we need a Metadata interface and
different implementations.

Thanks.

Jiangjie (Becket) Qin



On 5/5/15, 11:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

+1 on trying to reuse the NetworkClient code.

I think Jun's approach could work, but I'm wondering if refactoring a bit
could get better separation of concerns without a somewhat awkward nop
implementation of Metadata. I'm not sure what combination of delegation or
subclassing makes sense yet, but here's another approach that I think
could
work:

* Get rid of metadata stuff from NetworkClient. Add a subclass that also
manages all the metadata. (Since it's used for both producer and consumer,
the obvious name that I first jumped to is ClientNetworkClient, but
somehow
I think we can come up with something less confusing.)
* leastLoadedNode is the only caller of metadata.fetch() in that class,
maybeUpdateMetadata is the only caller of leastLoadedNode,
maybeUpdateMetadata is only called in poll when a combination of metadata
related timeouts end up being 0. These can be safely refactored into the
subclass with one override of poll(). Same with metadataFetchInProgress
assuming the rest of the changes below.
* Some of the default implementations (e.g. handleMetadataResponse) can be
left nops in NetworkClient and moved to the subclass.
* Others can be overridden to call the super method then take the
additional action necessary (e.g., on disconnect, move the metadata update
request to the subclass).
* Making the timeout handling in poll() work for both NetworkClient and
the
new base class might be the messiest part and might require breaking down
the implementation of poll into multiple methods.
* isReady uses metadataFetchInProgress and gets a timeout from the
Metadata
class. We can just override this method as well, though I feel like
there's
probably a cleaner solution.

-Ewen


On Tue, May 5, 2015 at 4:54 PM, Jun Rao j...@confluent.io wrote:

 Hi, Jiangjie,

 Thanks for taking on this.

 I was thinking that one way to decouple the dependency on Metadata in
 NetworkClient is the following.
 1. Make Metadata an interface.
 2. Rename current Metadata class to sth like KafkaMetadata that
implements
 the Metadata interface.
 3. Have a new NoOpMetadata class that implements the Metadata interface.
 This class
 3.1 does nothing for any write method
 3.2 returns max long for any method that asks for a timestamp
 3.3. returns an empty Cluster for fetch().

 Then we can leave NetworkClient unchanged and just pass in a
NoOpMetadata
 when using NetworkClient in the controller. The consumer/producer client
 will be using KafkaMetadata.

 As for replica fetchers, it may be possible to use KafkaConsumer.
However,
 we don't need the metadata and the offset management. So, perhaps it's
 easier to just use NetworkClient. Also, currently, there is one replica
 fetcher thread per source broker. By using NetworkClient, we can change
 that to using a single thread for all source brokers. This is probably a
 bigger change. So, maybe we can do it later.

 Jun


 I think we probably need to replace replica fetcher with NetworkClient
as
 well. Replica fetcher gets leader from the controller and therefore
doesn't

 On Tue, May 5, 2015 at 1:37 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

  I am trying to see if we can reuse the NetworkClient class to be used
in
  controller to broker communication. (Also, we can probably use
  KafkaConsumer which is already using NetworkClient in replica
fetchers).
  Currently NetworkClient does the following things in addition to
sending
  requests.
 
1.  Connection state management.
2.  Flow control (inflight requests)
3.  Metadata refresh
 
  In controller we need (1) and (2) but not (3). NetworkClient is
tightly
  coupled with metadata now and this is the major blocker of reusing
  NetworkClient in controller.  For controller, we don¹t need
NetworkClient
  to manage any metadata because the controller has listeners to monitor
 the
  cluster state and has all the information about topic metadata.
  I am thinking we can add a disable metadata refresh flag to
NetworkClient
  or set 

Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient

2015-05-06 Thread Jun Rao
Jiangjie,

Yes, I think using metadata timeout to expire batches in the record
accumulator makes sense.

Thanks,

Jun

On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I incorporated Ewen and Guozhang’s comments in the KIP page. Want to speed
 up on this KIP because currently we experience mirror-maker hung very
 likely when a broker is down.

 I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata
 timeout to expire the batches which are sitting in accumulator without
 leader info. I did that because the situation there is essentially missing
 metadata.

 As a summary of what I am thinking about the timeout in new Producer:

 1. Metadata timeout:
   - used in send(), blocking
   - used in accumulator to expire batches with timeout exception.
 2. Linger.ms
   - Used in accumulator to ready the batch for drain
 3. Request timeout
   - Used in NetworkClient to expire a batch and retry if no response is
 received for a request before timeout.

 So in this KIP, we only address (3). The only public interface change is a
 new configuration of request timeout (and maybe change the configuration
 name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).

 Would like to see what people think of above approach?

 Jiangjie (Becket) Qin

 On 4/20/15, 6:02 PM, Jiangjie Qin j...@linkedin.com wrote:

 Jun,
 
 I thought a little bit differently on this.
 Intuitively, I am thinking that if a partition is offline, the metadata
 for that partition should be considered not ready because we don’t know
 which broker we should send the message to. So those sends need to be
 blocked on metadata timeout.
 Another thing I’m wondering is in which scenario an offline partition will
 become online again in a short period of time and how likely it will
 occur. My understanding is that the batch timeout for batches sitting in
 accumulator should be larger than linger.ms but should not be too long
 (e.g. less than 60 seconds). Otherwise it will exhaust the shared buffer
 with batches to be aborted.
 
 That said, I do agree it is reasonable to buffer the message for some time
 so messages to other partitions can still get sent. But adding another
 expiration in addition to linger.ms - which is essentially a timeout -
 sounds a little bit confusing. Maybe we can do this, let the batch sit in
 accumulator up to linger.ms, then fail it if necessary.
 
 What do you think?
 
 Thanks,
 
 Jiangjie (Becket) Qin
 
 On 4/20/15, 1:11 PM, Jun Rao j...@confluent.io wrote:
 
 Jiangjie,
 
 Allowing messages to be accumulated in an offline partition could be
 useful
 since the partition may become available before the request timeout or
 linger time is reached. Now that we are planning to add a new timeout, it
 would be useful to think through whether/how that applies to messages in
 the accumulator too.
 
 Thanks,
 
 Jun
 
 
 On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin j...@linkedin.com.invalid
 
 wrote:
 
  Hi Harsha,
 
  Took a quick look at the patch. I think it is still a little bit
  different. KAFKA-1788 only handles the case where a batch sitting in
  accumulator for too long. The KIP is trying to solve the issue where a
  batch has already been drained from accumulator and sent to broker.
  We might be able to apply timeout on batch level to merge those two
 cases
  as Ewen suggested. But I’m not sure if it is a good idea to allow
 messages
  whose target partition is offline to sit in accumulator in the first
 place.
 
  Jiangjie (Becket) Qin
 
  On 4/16/15, 10:19 AM, Sriharsha Chintalapani ka...@harsha.io
 wrote:
 
  Guozhang and Jiangjie,
   Isn’t this work being covered in
  https://issues.apache.org/jira/browse/KAFKA-1788 . Can you please the
  review the patch there.
  Thanks,
  Harsha
  
  
  On April 15, 2015 at 10:39:40 PM, Guozhang Wang (wangg...@gmail.com)
  wrote:
  
  Thanks for the update Jiangjie,
  
  I think it is actually NOT expected that hardware disconnection will
 be
  detected by the selector, but rather will only be revealed upon TCP
  timeout, which could be hours.
  
  A couple of comments on the wiki:
  
  1. For KafkaProducer.close() and KafkaProducer.flush() we need the
  request
  timeout as implict timeout. I am not very clear what does this mean?
  
  2. Currently the producer already has a TIMEOUT_CONFIG which should
  really be REPLICATION_TIMEOUT_CONFIG. So if we decide to add 
  REQUEST_TIMEOUT_CONFIG, I suggest we also make this renaming:
 admittedly
  
  it will change the config names but will reduce confusions moving
  forward.
  
  
  Guozhang
  
  
  On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
  wrote:
  
   Checked the code again. It seems that the disconnected channel is
 not
   detected by selector as expected.
  
   Currently we are depending on the
   o.a.k.common.network.Selector.disconnected set to see if we need to
 do
   something for a disconnected channel.
   However Selector.disconnected set is only 

Re: [DISCUSS] KIP-21 Configuration Management

2015-05-06 Thread Ashish Singh
Hey Jun,

Where does the broker get the info, which zk it needs to talk to?

On Wednesday, May 6, 2015, Jun Rao j...@confluent.io wrote:

 Ashish,

 3. Just want to clarify. Why can't you store ZK connection config in ZK?
 This is a property for ZK clients, not ZK server.

 Thanks,

 Jun

 On Wed, May 6, 2015 at 5:48 PM, Ashish Singh asi...@cloudera.com
 javascript:; wrote:

  I too would like to share some concerns that we came up with while
  discussing the effect of moving configs to zookeeper will have.
 
  1. Kafka will start to become a configuration management tool to some
  degree, and be subject to all the things such tools are commonly asked to
  do. Kafka'll likely need to re-implement the role / group / service
  hierarchy that CM uses. Kafka'll need some way to conveniently dump its
  configs so they can be re-imported later, as a backup tool. People will
  want this to be audited, which means you'd need distinct logins for
  different people, and user management. You can try to push some of this
  stuff onto tools like CM, but this is Kafka going out of its way to be
  difficult to manage, and most projects don't want to do that. Being
 unique
  in how configuration is done is strictly a bad thing for both integration
  and usability. Probably lots of other stuff. Seems like a bad direction.
 
  2. Where would the default config live? If we decide on keeping the
 config
  files around just for getting the default config, then I think on
 restart,
  the config file will be ignored. This creates an obnoxious asymmetry for
  how to configure Kafka the first time and how you update it. You have to
  learn 2 ways of making config changes. If there was a mistake in your
  original config file, you can't just edit the config file and restart,
 you
  have to go through the API. Reading configs is also more irritating. This
  all creates a learning curve for users of Kafka that will make it harder
 to
  use than other projects. This is also a backwards-incompatible change.
 
  3. All Kafka configs living in ZK is strictly impossible, since at the
 very
  least ZK connection configs cannot be stored in ZK. So you will have a
 file
  where some values are in effect but others are not, which is again
  confusing. Also, since you are still reading the config file on first
  start, there are still multiple sources of truth, or at least the
  appearance of such to the user.
 
  On Wed, May 6, 2015 at 5:33 PM, Jun Rao j...@confluent.io javascript:;
 wrote:
 
   One of the Chef users confirmed that Chef integration could still work
 if
   all configs are moved to ZK. My rough understanding of how Chef works
 is
   that a user first registers a service host with a Chef server. After
  that,
   a Chef client will be run on the service host. The user can then push
   config changes intended for a service/host to the Chef server. The
 server
   is then responsible for pushing the changes to Chef clients. Chef
 clients
   support pluggable logic. For example, it can generate a config file
 that
   Kafka broker will take. If we move all configs to ZK, we can customize
  the
   Chef client to use our config CLI to make the config changes in Kafka.
 In
   this model, one probably doesn't need to register every broker in Chef
  for
   the config push. Not sure if Puppet works in a similar way.
  
   Also for storing the configs, we probably can't store the broker/global
   level configs in Kafka itself (e.g. in a special topic). The reason is
  that
   in order to start a broker, we likely need to make some broker level
  config
   changes (e.g., the default log.dir may not be present, the default port
  may
   not be available, etc). If we need a broker to be up to make those
  changes,
   we get into this chicken and egg problem.
  
   Thanks,
  
   Jun
  
   On Tue, May 5, 2015 at 4:14 PM, Gwen Shapira gshap...@cloudera.com
 javascript:;
   wrote:
  
Sorry I missed the call today :)
   
I think an additional requirement would be:
Make sure that traditional deployment tools (Puppet, Chef, etc) are
  still
capable of managing Kafka configuration.
   
For this reason, I'd like the configuration refresh to be pretty
 close
  to
what most Linux services are doing to force a reload of
 configuration.
AFAIK, this involves handling HUP signal in the main thread to reload
configuration. Then packaging scripts can add something nice like
   service
kafka reload.
   
(See Apache web server:
https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101
 )
   
Gwen
   
   
On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com
 javascript:;
  wrote:
   
 Good discussion. Since we will be talking about this at 11am, I
  wanted
 to organize these comments into requirements to see if we are all
 on
 the same page.

 REQUIREMENT 1: Needs to accept dynamic config changes. This needs
 to
 be general enough to work for all configs that we 

Re: Need Access to Wiki Page To Create Page for Discussion

2015-05-06 Thread Bhavesh Mistry
Hi Jun,

The account id is Bmis13.

Thanks,
Bhavesh

On Wed, May 6, 2015 at 4:52 PM, Jun Rao j...@confluent.io wrote:

 What your wiki user id?

 Thanks,

 Jun

 On Wed, May 6, 2015 at 11:09 AM, Bhavesh Mistry 
 mistry.p.bhav...@gmail.com
 wrote:

  Hi All,
 
  I need access to create Discussion or KIP document.  Let me know what is
  process of getting access.
 
  Thanks,
 
  Bhavesh
 



Re: Review Request 32650: Patch for KAFKA-2000

2015-05-06 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32650/#review82775
---


Thanks for the updated patch.


core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/32650/#comment133593

Sorry I didn't notice this earlier. This message is now slightly incorrect. 
Can we get a break-up of the number of offsets deleted due to expiration and 
due to topic deletion? BTW I'm touching this in KAFKA-2163 as well (which you 
may want to check out).



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/32650/#comment133586

Pre-existing issue, but could you rename this to OffsetManagementTest?



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/32650/#comment133587

testOffsetsDeletedAfterTopicDeletion



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/32650/#comment133588

Can you use the more recent commit version (which has an explicit retention 
time?)



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/32650/#comment133590

Can you also commit offsets for some other topic and verify that those 
offsets are _not_ deleted?

As mentioned in the earlier RB, for the first scenario, we depend on the 
condition that the UpdateMetadataRequest is sent first. It would be good to 
have a unit test that explicitly tests this so we never unknowingly break that 
assumption. I don't have a good way to test this though :( If you have any 
ideas that would be great. Part of the issue is we have little to no test 
coverage on the controller.


- Joel Koshy


On May 3, 2015, 5:39 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32650/
 ---
 
 (Updated May 3, 2015, 5:39 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2000
 https://issues.apache.org/jira/browse/KAFKA-2000
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2000. Delete consumer offsets from kafka once the topic is deleted.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 18680ce100f10035175cc0263ba7787ab0f6a17a 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 652208a70f66045b854549d93cbbc2b77c24b10b 
 
 Diff: https://reviews.apache.org/r/32650/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: [DISCUSS] KIP-21 Configuration Management

2015-05-06 Thread Jun Rao
Ashish,

3. Just want to clarify. Why can't you store ZK connection config in ZK?
This is a property for ZK clients, not ZK server.

Thanks,

Jun

On Wed, May 6, 2015 at 5:48 PM, Ashish Singh asi...@cloudera.com wrote:

 I too would like to share some concerns that we came up with while
 discussing the effect of moving configs to zookeeper will have.

 1. Kafka will start to become a configuration management tool to some
 degree, and be subject to all the things such tools are commonly asked to
 do. Kafka'll likely need to re-implement the role / group / service
 hierarchy that CM uses. Kafka'll need some way to conveniently dump its
 configs so they can be re-imported later, as a backup tool. People will
 want this to be audited, which means you'd need distinct logins for
 different people, and user management. You can try to push some of this
 stuff onto tools like CM, but this is Kafka going out of its way to be
 difficult to manage, and most projects don't want to do that. Being unique
 in how configuration is done is strictly a bad thing for both integration
 and usability. Probably lots of other stuff. Seems like a bad direction.

 2. Where would the default config live? If we decide on keeping the config
 files around just for getting the default config, then I think on restart,
 the config file will be ignored. This creates an obnoxious asymmetry for
 how to configure Kafka the first time and how you update it. You have to
 learn 2 ways of making config changes. If there was a mistake in your
 original config file, you can't just edit the config file and restart, you
 have to go through the API. Reading configs is also more irritating. This
 all creates a learning curve for users of Kafka that will make it harder to
 use than other projects. This is also a backwards-incompatible change.

 3. All Kafka configs living in ZK is strictly impossible, since at the very
 least ZK connection configs cannot be stored in ZK. So you will have a file
 where some values are in effect but others are not, which is again
 confusing. Also, since you are still reading the config file on first
 start, there are still multiple sources of truth, or at least the
 appearance of such to the user.

 On Wed, May 6, 2015 at 5:33 PM, Jun Rao j...@confluent.io wrote:

  One of the Chef users confirmed that Chef integration could still work if
  all configs are moved to ZK. My rough understanding of how Chef works is
  that a user first registers a service host with a Chef server. After
 that,
  a Chef client will be run on the service host. The user can then push
  config changes intended for a service/host to the Chef server. The server
  is then responsible for pushing the changes to Chef clients. Chef clients
  support pluggable logic. For example, it can generate a config file that
  Kafka broker will take. If we move all configs to ZK, we can customize
 the
  Chef client to use our config CLI to make the config changes in Kafka. In
  this model, one probably doesn't need to register every broker in Chef
 for
  the config push. Not sure if Puppet works in a similar way.
 
  Also for storing the configs, we probably can't store the broker/global
  level configs in Kafka itself (e.g. in a special topic). The reason is
 that
  in order to start a broker, we likely need to make some broker level
 config
  changes (e.g., the default log.dir may not be present, the default port
 may
  not be available, etc). If we need a broker to be up to make those
 changes,
  we get into this chicken and egg problem.
 
  Thanks,
 
  Jun
 
  On Tue, May 5, 2015 at 4:14 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   Sorry I missed the call today :)
  
   I think an additional requirement would be:
   Make sure that traditional deployment tools (Puppet, Chef, etc) are
 still
   capable of managing Kafka configuration.
  
   For this reason, I'd like the configuration refresh to be pretty close
 to
   what most Linux services are doing to force a reload of configuration.
   AFAIK, this involves handling HUP signal in the main thread to reload
   configuration. Then packaging scripts can add something nice like
  service
   kafka reload.
  
   (See Apache web server:
   https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101)
  
   Gwen
  
  
   On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com
 wrote:
  
Good discussion. Since we will be talking about this at 11am, I
 wanted
to organize these comments into requirements to see if we are all on
the same page.
   
REQUIREMENT 1: Needs to accept dynamic config changes. This needs to
be general enough to work for all configs that we envision may need
 to
accept changes at runtime. e.g., log (topic), broker, client
 (quotas),
etc.. possible options include:
- ZooKeeper watcher
- Kafka topic
- Direct RPC to controller (or config coordinator)
   
The current KIP is really focused on REQUIREMENT 1 and I think that
 is

Re: [DISCUSS] KIP-21 Configuration Management

2015-05-06 Thread Ashish Singh
I too would like to share some concerns that we came up with while
discussing the effect of moving configs to zookeeper will have.

1. Kafka will start to become a configuration management tool to some
degree, and be subject to all the things such tools are commonly asked to
do. Kafka'll likely need to re-implement the role / group / service
hierarchy that CM uses. Kafka'll need some way to conveniently dump its
configs so they can be re-imported later, as a backup tool. People will
want this to be audited, which means you'd need distinct logins for
different people, and user management. You can try to push some of this
stuff onto tools like CM, but this is Kafka going out of its way to be
difficult to manage, and most projects don't want to do that. Being unique
in how configuration is done is strictly a bad thing for both integration
and usability. Probably lots of other stuff. Seems like a bad direction.

2. Where would the default config live? If we decide on keeping the config
files around just for getting the default config, then I think on restart,
the config file will be ignored. This creates an obnoxious asymmetry for
how to configure Kafka the first time and how you update it. You have to
learn 2 ways of making config changes. If there was a mistake in your
original config file, you can't just edit the config file and restart, you
have to go through the API. Reading configs is also more irritating. This
all creates a learning curve for users of Kafka that will make it harder to
use than other projects. This is also a backwards-incompatible change.

3. All Kafka configs living in ZK is strictly impossible, since at the very
least ZK connection configs cannot be stored in ZK. So you will have a file
where some values are in effect but others are not, which is again
confusing. Also, since you are still reading the config file on first
start, there are still multiple sources of truth, or at least the
appearance of such to the user.

On Wed, May 6, 2015 at 5:33 PM, Jun Rao j...@confluent.io wrote:

 One of the Chef users confirmed that Chef integration could still work if
 all configs are moved to ZK. My rough understanding of how Chef works is
 that a user first registers a service host with a Chef server. After that,
 a Chef client will be run on the service host. The user can then push
 config changes intended for a service/host to the Chef server. The server
 is then responsible for pushing the changes to Chef clients. Chef clients
 support pluggable logic. For example, it can generate a config file that
 Kafka broker will take. If we move all configs to ZK, we can customize the
 Chef client to use our config CLI to make the config changes in Kafka. In
 this model, one probably doesn't need to register every broker in Chef for
 the config push. Not sure if Puppet works in a similar way.

 Also for storing the configs, we probably can't store the broker/global
 level configs in Kafka itself (e.g. in a special topic). The reason is that
 in order to start a broker, we likely need to make some broker level config
 changes (e.g., the default log.dir may not be present, the default port may
 not be available, etc). If we need a broker to be up to make those changes,
 we get into this chicken and egg problem.

 Thanks,

 Jun

 On Tue, May 5, 2015 at 4:14 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  Sorry I missed the call today :)
 
  I think an additional requirement would be:
  Make sure that traditional deployment tools (Puppet, Chef, etc) are still
  capable of managing Kafka configuration.
 
  For this reason, I'd like the configuration refresh to be pretty close to
  what most Linux services are doing to force a reload of configuration.
  AFAIK, this involves handling HUP signal in the main thread to reload
  configuration. Then packaging scripts can add something nice like
 service
  kafka reload.
 
  (See Apache web server:
  https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101)
 
  Gwen
 
 
  On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com wrote:
 
   Good discussion. Since we will be talking about this at 11am, I wanted
   to organize these comments into requirements to see if we are all on
   the same page.
  
   REQUIREMENT 1: Needs to accept dynamic config changes. This needs to
   be general enough to work for all configs that we envision may need to
   accept changes at runtime. e.g., log (topic), broker, client (quotas),
   etc.. possible options include:
   - ZooKeeper watcher
   - Kafka topic
   - Direct RPC to controller (or config coordinator)
  
   The current KIP is really focused on REQUIREMENT 1 and I think that is
   reasonable as long as we don't come up with something that requires
   significant re-engineering to support the other requirements.
  
   REQUIREMENT 2: Provide consistency of configs across brokers (modulo
   per-broker overrides) or at least be able to verify consistency.  What
   this effectively means is that config changes 

Re: Review Request 33378: Patch for KAFKA-2136

2015-05-06 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33378/
---

(Updated May 7, 2015, 1:32 a.m.)


Review request for kafka and Joel Koshy.


Bugs: KAFKA-2136
https://issues.apache.org/jira/browse/KAFKA-2136


Repository: kafka


Description (updated)
---

Changes are:
- protocol changes to the fetch reuqest and response to return the 
throttle_time_ms to clients
- New producer/consumer metrics to expose the avg and max delay time for a 
client
- Test cases

For now the patch will publish a zero delay and return a response

Added more tests


Addressing Jun's comments


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b2db91ca14bbd17fef5ce85839679144fff3f689 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
3dc8b015afd2347a41c9a9dbc02b8e367da5f75f 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
8686d83aa52e435c6adafbe9ff4bd1602281072a 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
eb8951fba48c335095cc43fc3672de1c733e07ff 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
fabeae3083a8ea55cdacbb9568f3847ccd85bab4 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
37ec0b79beafcf5735c386b066eb319fb697eff5 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
e3cc1967e407b64cc734548c19e30de700b64ba8 
  core/src/main/scala/kafka/api/FetchRequest.scala 
b038c15186c0cbcc65b59479324052498361b717 
  core/src/main/scala/kafka/api/FetchResponse.scala 
75aaf57fb76ec01660d93701a57ae953d877d81c 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
570b2da1d865086f9830aa919a49063abbbe574d 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
31a2639477bf66f9a05d2b9b07794572d7ec393b 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
a439046e118b6efcc3a5a9d9e8acb79f85e40398 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
de6cf5bdaa0e70394162febc63b50b55ca0a92db 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
05078b24ef28f2f4e099afa943e43f1d00359fda 
  core/src/main/scala/kafka/server/KafkaApis.scala 
417960dd1ab407ebebad8fdb0e97415db3e91a2f 
  core/src/main/scala/kafka/server/OffsetManager.scala 
18680ce100f10035175cc0263ba7787ab0f6a17a 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b31b432a226ba79546dd22ef1d2acbb439c2e9a3 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
5717165f2344823fabe8f7cfafae4bb8af2d949a 
  core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
f3ab3f4ff8eb1aa6b2ab87ba75f72eceb6649620 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
00d59337a99ac135e8689bd1ecd928f7b1423d79 

Diff: https://reviews.apache.org/r/33378/diff/


Testing
---

New tests added


Thanks,

Aditya Auradkar



Re: [DISCUSS] KIP-21 Configuration Management

2015-05-06 Thread Ashish Singh
This raises one more concern. Just putting it out there.

4. What if one decides to move to a different zk, I am sure there are other
concerns in status quo for this question. However, this is adding to the
concern, right? Not sure though how much big of a concern it practically is.

On Wednesday, May 6, 2015, Ashish Singh asi...@cloudera.com wrote:

 Hey Jun,

 Where does the broker get the info, which zk it needs to talk to?

 On Wednesday, May 6, 2015, Jun Rao j...@confluent.io
 javascript:_e(%7B%7D,'cvml','j...@confluent.io'); wrote:

 Ashish,

 3. Just want to clarify. Why can't you store ZK connection config in ZK?
 This is a property for ZK clients, not ZK server.

 Thanks,

 Jun

 On Wed, May 6, 2015 at 5:48 PM, Ashish Singh asi...@cloudera.com wrote:

  I too would like to share some concerns that we came up with while
  discussing the effect of moving configs to zookeeper will have.
 
  1. Kafka will start to become a configuration management tool to some
  degree, and be subject to all the things such tools are commonly asked
 to
  do. Kafka'll likely need to re-implement the role / group / service
  hierarchy that CM uses. Kafka'll need some way to conveniently dump its
  configs so they can be re-imported later, as a backup tool. People will
  want this to be audited, which means you'd need distinct logins for
  different people, and user management. You can try to push some of this
  stuff onto tools like CM, but this is Kafka going out of its way to be
  difficult to manage, and most projects don't want to do that. Being
 unique
  in how configuration is done is strictly a bad thing for both
 integration
  and usability. Probably lots of other stuff. Seems like a bad direction.
 
  2. Where would the default config live? If we decide on keeping the
 config
  files around just for getting the default config, then I think on
 restart,
  the config file will be ignored. This creates an obnoxious asymmetry for
  how to configure Kafka the first time and how you update it. You have to
  learn 2 ways of making config changes. If there was a mistake in your
  original config file, you can't just edit the config file and restart,
 you
  have to go through the API. Reading configs is also more irritating.
 This
  all creates a learning curve for users of Kafka that will make it
 harder to
  use than other projects. This is also a backwards-incompatible change.
 
  3. All Kafka configs living in ZK is strictly impossible, since at the
 very
  least ZK connection configs cannot be stored in ZK. So you will have a
 file
  where some values are in effect but others are not, which is again
  confusing. Also, since you are still reading the config file on first
  start, there are still multiple sources of truth, or at least the
  appearance of such to the user.
 
  On Wed, May 6, 2015 at 5:33 PM, Jun Rao j...@confluent.io wrote:
 
   One of the Chef users confirmed that Chef integration could still
 work if
   all configs are moved to ZK. My rough understanding of how Chef works
 is
   that a user first registers a service host with a Chef server. After
  that,
   a Chef client will be run on the service host. The user can then push
   config changes intended for a service/host to the Chef server. The
 server
   is then responsible for pushing the changes to Chef clients. Chef
 clients
   support pluggable logic. For example, it can generate a config file
 that
   Kafka broker will take. If we move all configs to ZK, we can customize
  the
   Chef client to use our config CLI to make the config changes in
 Kafka. In
   this model, one probably doesn't need to register every broker in Chef
  for
   the config push. Not sure if Puppet works in a similar way.
  
   Also for storing the configs, we probably can't store the
 broker/global
   level configs in Kafka itself (e.g. in a special topic). The reason is
  that
   in order to start a broker, we likely need to make some broker level
  config
   changes (e.g., the default log.dir may not be present, the default
 port
  may
   not be available, etc). If we need a broker to be up to make those
  changes,
   we get into this chicken and egg problem.
  
   Thanks,
  
   Jun
  
   On Tue, May 5, 2015 at 4:14 PM, Gwen Shapira gshap...@cloudera.com
   wrote:
  
Sorry I missed the call today :)
   
I think an additional requirement would be:
Make sure that traditional deployment tools (Puppet, Chef, etc) are
  still
capable of managing Kafka configuration.
   
For this reason, I'd like the configuration refresh to be pretty
 close
  to
what most Linux services are doing to force a reload of
 configuration.
AFAIK, this involves handling HUP signal in the main thread to
 reload
configuration. Then packaging scripts can add something nice like
   service
kafka reload.
   
(See Apache web server:
   
 https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101)
   
Gwen
   
   
On Tue, May 5, 2015 at 8:54 AM, 

Re: Review Request 33378: Patch for KAFKA-2136

2015-05-06 Thread Aditya Auradkar


 On May 4, 2015, 4:51 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/api/FetchResponse.scala, lines 152-153
  https://reviews.apache.org/r/33378/diff/1/?file=937083#file937083line152
 
  This is tricky since FetchRequest is used in the follower as well. When 
  doing a rolling upgrade of the broker to 0.8.3, we have to follow the 
  following steps.
  1. Configure intra.cluster.protocol to 0.8.2 and rolling upgrade each 
  broker to 0.8.3. After this step, each broker understands version 1 of the 
  fetch request, but still sends fetch request in version 0.
  2. Configure intra.cluster.protocol to 0.8.3 and restart each broker. 
  After this step, every broker will start sending fetch request in version 1.
  
  So, we need the logic in ReplicaFetcherThread to issue the right 
  version of the FetchRequest according to intra.cluster.protocol. We also 
  need to read the response according to the request version (i.e., can't 
  just assume the response is always on the latest version).

Good point. I'm passing in the protocol version to use on the fetchRequest to 
the AbstractFetcherThread. Also, not reading the response based on the request 
version.


- Aditya


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33378/#review82389
---


On May 7, 2015, 1:36 a.m., Aditya Auradkar wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/33378/
 ---
 
 (Updated May 7, 2015, 1:36 a.m.)
 
 
 Review request for kafka, Joel Koshy and Jun Rao.
 
 
 Bugs: KAFKA-2136
 https://issues.apache.org/jira/browse/KAFKA-2136
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Changes are:
 - protocol changes to the fetch reuqest and response to return the 
 throttle_time_ms to clients
 - New producer/consumer metrics to expose the avg and max delay time for a 
 client
 - Test cases
 
 For now the patch will publish a zero delay and return a response
 
 Added more tests
 
 
 Addressing Jun's comments
 
 
 Formatting changes
 
 
 Diffs
 -
 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 b2db91ca14bbd17fef5ce85839679144fff3f689 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f 
   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
 8686d83aa52e435c6adafbe9ff4bd1602281072a 
   clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
 eb8951fba48c335095cc43fc3672de1c733e07ff 
   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
 fabeae3083a8ea55cdacbb9568f3847ccd85bab4 
   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
 37ec0b79beafcf5735c386b066eb319fb697eff5 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  419541011d652becf0cda7a5e62ce813cddb1732 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
  8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  e3cc1967e407b64cc734548c19e30de700b64ba8 
   core/src/main/scala/kafka/api/FetchRequest.scala 
 b038c15186c0cbcc65b59479324052498361b717 
   core/src/main/scala/kafka/api/FetchResponse.scala 
 75aaf57fb76ec01660d93701a57ae953d877d81c 
   core/src/main/scala/kafka/api/ProducerRequest.scala 
 570b2da1d865086f9830aa919a49063abbbe574d 
   core/src/main/scala/kafka/api/ProducerResponse.scala 
 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 
   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
 31a2639477bf66f9a05d2b9b07794572d7ec393b 
   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
 a439046e118b6efcc3a5a9d9e8acb79f85e40398 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 de6cf5bdaa0e70394162febc63b50b55ca0a92db 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 05078b24ef28f2f4e099afa943e43f1d00359fda 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 417960dd1ab407ebebad8fdb0e97415db3e91a2f 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 18680ce100f10035175cc0263ba7787ab0f6a17a 
   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
 b31b432a226ba79546dd22ef1d2acbb439c2e9a3 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 5717165f2344823fabe8f7cfafae4bb8af2d949a 
   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
 f3ab3f4ff8eb1aa6b2ab87ba75f72eceb6649620 
   

Re: Need Access to Wiki Page To Create Page for Discussion

2015-05-06 Thread Guozhang Wang
Bhavesh,

I could not find Bmis13 when adding you to the wiki permission. Could you
double check the account id?

Guozhang

On Wed, May 6, 2015 at 6:47 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 Hi Jun,

 The account id is Bmis13.

 Thanks,
 Bhavesh

 On Wed, May 6, 2015 at 4:52 PM, Jun Rao j...@confluent.io wrote:

  What your wiki user id?
 
  Thanks,
 
  Jun
 
  On Wed, May 6, 2015 at 11:09 AM, Bhavesh Mistry 
  mistry.p.bhav...@gmail.com
  wrote:
 
   Hi All,
  
   I need access to create Discussion or KIP document.  Let me know what
 is
   process of getting access.
  
   Thanks,
  
   Bhavesh
  
 




-- 
-- Guozhang


Re: [DISCUSS] KIP-21 Configuration Management

2015-05-06 Thread Jun Rao
One of the Chef users confirmed that Chef integration could still work if
all configs are moved to ZK. My rough understanding of how Chef works is
that a user first registers a service host with a Chef server. After that,
a Chef client will be run on the service host. The user can then push
config changes intended for a service/host to the Chef server. The server
is then responsible for pushing the changes to Chef clients. Chef clients
support pluggable logic. For example, it can generate a config file that
Kafka broker will take. If we move all configs to ZK, we can customize the
Chef client to use our config CLI to make the config changes in Kafka. In
this model, one probably doesn't need to register every broker in Chef for
the config push. Not sure if Puppet works in a similar way.

Also for storing the configs, we probably can't store the broker/global
level configs in Kafka itself (e.g. in a special topic). The reason is that
in order to start a broker, we likely need to make some broker level config
changes (e.g., the default log.dir may not be present, the default port may
not be available, etc). If we need a broker to be up to make those changes,
we get into this chicken and egg problem.

Thanks,

Jun

On Tue, May 5, 2015 at 4:14 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Sorry I missed the call today :)

 I think an additional requirement would be:
 Make sure that traditional deployment tools (Puppet, Chef, etc) are still
 capable of managing Kafka configuration.

 For this reason, I'd like the configuration refresh to be pretty close to
 what most Linux services are doing to force a reload of configuration.
 AFAIK, this involves handling HUP signal in the main thread to reload
 configuration. Then packaging scripts can add something nice like service
 kafka reload.

 (See Apache web server:
 https://github.com/apache/httpd/blob/trunk/build/rpm/httpd.init#L101)

 Gwen


 On Tue, May 5, 2015 at 8:54 AM, Joel Koshy jjkosh...@gmail.com wrote:

  Good discussion. Since we will be talking about this at 11am, I wanted
  to organize these comments into requirements to see if we are all on
  the same page.
 
  REQUIREMENT 1: Needs to accept dynamic config changes. This needs to
  be general enough to work for all configs that we envision may need to
  accept changes at runtime. e.g., log (topic), broker, client (quotas),
  etc.. possible options include:
  - ZooKeeper watcher
  - Kafka topic
  - Direct RPC to controller (or config coordinator)
 
  The current KIP is really focused on REQUIREMENT 1 and I think that is
  reasonable as long as we don't come up with something that requires
  significant re-engineering to support the other requirements.
 
  REQUIREMENT 2: Provide consistency of configs across brokers (modulo
  per-broker overrides) or at least be able to verify consistency.  What
  this effectively means is that config changes must be seen by all
  brokers eventually and we should be able to easily compare the full
  config of each broker.
 
  REQUIREMENT 3: Central config store. Needs to work with plain
  file-based configs and other systems (e.g., puppet). Ideally, should
  not bring in other dependencies (e.g., a DB). Possible options:
  - ZooKeeper
  - Kafka topic
  - other? E.g. making it pluggable?
 
  Any other requirements?
 
  Thanks,
 
  Joel
 
  On Tue, May 05, 2015 at 01:38:09AM +, Aditya Auradkar wrote:
   Hey Neha,
  
   Thanks for the feedback.
   1. In my earlier exchange with Jay, I mentioned the broker writing all
  it's configs to ZK (while respecting the overrides). Then ZK can be used
 to
  view all configs.
  
   2. Need to think about this a bit more. Perhaps we can discuss this
  during the hangout tomorrow?
  
   3  4) I viewed these config changes as mainly administrative
  operations. In the case, it may be reasonable to assume that the ZK port
 is
  available for communication from the machine these commands are run.
 Having
  a ConfigChangeRequest (or similar) is nice to have but having a new API
 and
  sending requests to controller also change how we do topic based
  configuration currently. I was hoping to keep this KIP as minimal as
  possible and provide a means to represent and modify client and broker
  based configs in a central place. Are there any concerns if we tackle
 these
  things in a later KIP?
  
   Thanks,
   Aditya
   
   From: Neha Narkhede [n...@confluent.io]
   Sent: Sunday, May 03, 2015 9:48 AM
   To: dev@kafka.apache.org
   Subject: Re: [DISCUSS] KIP-21 Configuration Management
  
   Thanks for starting this discussion, Aditya. Few questions/comments
  
   1. If you change the default values like it's mentioned in the KIP, do
  you
   also overwrite the local config file as part of updating the default
  value?
   If not, where does the admin look to find the default values, ZK or
 local
   Kafka config file? What if a config value is different in both places?
  
   2. I share Gwen's concern 

Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-05-06 Thread Harsha
Thanks for the review Joel. I agree don't need a init method we can use
configure. I'll update the KIP.
-Harsha

On Wed, May 6, 2015, at 04:45 PM, Joel Koshy wrote:
 +1 with a minor comment: do we need an init method given it extends
 Configurable?
 
 Also, can you move this wiki out of drafts and add it to the table in
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals?
 
 Thanks,
 
 Joel
 
 On Wed, May 06, 2015 at 07:46:46AM -0700, Sriharsha Chintalapani wrote:
  Thanks Jay. I removed partitioner.metadata from KIP. I’ll send an updated 
  patch.
  
  -- 
  Harsha
  Sent with Airmail
  
  On May 5, 2015 at 6:31:47 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) 
  wrote:
  
  Thanks for the comments everyone.
  Hi Jay,
       I do have a question regarding configurable interface on how to pass a 
  MapString, ? properties. I couldn’t find any other classes using it. JMX 
  reporter overrides it but doesn’t implement it.  So with configurable 
  partitioner how can a user pass in partitioner configuration since its 
  getting instantiated within the producer.
  
  Thanks,
  Harsha
  
  
  On May 4, 2015 at 10:36:45 AM, Jay Kreps (jay.kr...@gmail.com) wrote:
  
  Hey Harsha,
  
  That proposal sounds good. One minor thing--I don't think we need to have
  the partitioner.metadata property. Our reason for using string properties
  is exactly to make config extensible at runtime. So a given partitioner can
  add whatever properties make sense using the configure() api it defines.
  
  -Jay
  
  On Sun, May 3, 2015 at 5:57 PM, Harsha ka...@harsha.io wrote:
  
   Thanks Jay  Gianmarco for the comments. I picked the option A, if user
   sends a partition id than it will applied and partitioner.class method
   will only called if partition id is null .
   Please take a look at the updated KIP here
  
   https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
   . Let me know if you see anything missing.
  
   Thanks,
   Harsha
  
   On Fri, Apr 24, 2015, at 02:15 AM, Gianmarco De Francisci Morales wrote:
Hi,
   
   
Here are the questions I think we should consider:
 1. Do we need this at all given that we have the partition argument in
 ProducerRecord which gives full control? I think we do need it because
   this
 is a way to plug in a different partitioning strategy at run time and
   do it
 in a fairly transparent way.

   
Yes, we need it if we want to support different partitioning strategies
inside Kafka rather than requiring the user to code them externally.
   
   
 3. Do we need to add the value? I suspect people will have uses for
 computing something off a few fields in the value to choose the
   partition.
 This would be useful in cases where the key was being used for log
 compaction purposes and did not contain the full information for
   computing
 the partition.

   
I am not entirely sure about this. I guess that most partitioners should
not use it.
I think it makes it easier to reason about the system if the partitioner
only works on the key.
Hoever, if the value (and its serialization) are already available, 
there
is not much harm in passing them along.
   
   
 4. This interface doesn't include either an init() or close() method.
   It
 should implement Closable and Configurable, right?

   
Right now the only application I can think of to have an init() and
close()
is to read some state information (e.g., load information) that is
published on some external distributed storage (e.g., zookeeper) by the
brokers.
It might be useful also for reconfiguration and state migration.
   
I think it's not a very common use case right now, but if the added
complexity is not too much it might be worth to have support for these
methods.
   
   
   
 5. What happens if the user both sets the partition id in the
 ProducerRecord and sets a partitioner? Does the partition id just get
 passed in to the partitioner (as sort of implied in this interface?).
   This
 is a bit weird since if you pass in the partition id you kind of
   expect it
 to get used, right? Or is it the case that if you specify a partition
   the
 partitioner isn't used at all (in which case no point in including
 partition in the Partitioner api).


The user should be able to override the partitioner on a per-record 
basis
by explicitly setting the partition id.
I don't think it makes sense for the partitioners to take hints on the
partition.
   
I would even go the extra step, and have a default logic that accepts
both
key and partition id (current interface) and calls partition() only if
the
partition id is not set. The partition() method does *not* take the
partition ID as input (only key-value).
   
   
Cheers,
--
Gianmarco
   

[jira] [Commented] (KAFKA-2136) Client side protocol changes to return quota delays

2015-05-06 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14531857#comment-14531857
 ] 

Aditya A Auradkar commented on KAFKA-2136:
--

Updated reviewboard https://reviews.apache.org/r/33378/diff/
 against branch origin/trunk

 Client side protocol changes to return quota delays
 ---

 Key: KAFKA-2136
 URL: https://issues.apache.org/jira/browse/KAFKA-2136
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch


 As described in KIP-13, evolve the protocol to return a throttle_time_ms in 
 the Fetch and the ProduceResponse objects. Add client side metrics on the new 
 producer and consumer to expose the delay time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2136) Client side protocol changes to return quota delays

2015-05-06 Thread Aditya A Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya A Auradkar updated KAFKA-2136:
-
Attachment: KAFKA-2136_2015-05-06_18:32:48.patch

 Client side protocol changes to return quota delays
 ---

 Key: KAFKA-2136
 URL: https://issues.apache.org/jira/browse/KAFKA-2136
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch


 As described in KIP-13, evolve the protocol to return a throttle_time_ms in 
 the Fetch and the ProduceResponse objects. Add client side metrics on the new 
 producer and consumer to expose the delay time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2136) Client side protocol changes to return quota delays

2015-05-06 Thread Aditya A Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya A Auradkar updated KAFKA-2136:
-
Attachment: KAFKA-2136_2015-05-06_18:35:54.patch

 Client side protocol changes to return quota delays
 ---

 Key: KAFKA-2136
 URL: https://issues.apache.org/jira/browse/KAFKA-2136
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch, 
 KAFKA-2136_2015-05-06_18:35:54.patch


 As described in KIP-13, evolve the protocol to return a throttle_time_ms in 
 the Fetch and the ProduceResponse objects. Add client side metrics on the new 
 producer and consumer to expose the delay time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2136) Client side protocol changes to return quota delays

2015-05-06 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14531865#comment-14531865
 ] 

Aditya A Auradkar commented on KAFKA-2136:
--

Updated reviewboard https://reviews.apache.org/r/33378/diff/
 against branch origin/trunk

 Client side protocol changes to return quota delays
 ---

 Key: KAFKA-2136
 URL: https://issues.apache.org/jira/browse/KAFKA-2136
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar
 Attachments: KAFKA-2136.patch, KAFKA-2136_2015-05-06_18:32:48.patch, 
 KAFKA-2136_2015-05-06_18:35:54.patch


 As described in KIP-13, evolve the protocol to return a throttle_time_ms in 
 the Fetch and the ProduceResponse objects. Add client side metrics on the new 
 producer and consumer to expose the delay time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 33378: Patch for KAFKA-2136

2015-05-06 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33378/
---

(Updated May 7, 2015, 1:36 a.m.)


Review request for kafka and Joel Koshy.


Bugs: KAFKA-2136
https://issues.apache.org/jira/browse/KAFKA-2136


Repository: kafka


Description (updated)
---

Changes are:
- protocol changes to the fetch reuqest and response to return the 
throttle_time_ms to clients
- New producer/consumer metrics to expose the avg and max delay time for a 
client
- Test cases

For now the patch will publish a zero delay and return a response

Added more tests


Addressing Jun's comments


Formatting changes


Diffs (updated)
-

  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
ef9dd5238fbc771496029866ece1d85db6d7b7a5 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b2db91ca14bbd17fef5ce85839679144fff3f689 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
3dc8b015afd2347a41c9a9dbc02b8e367da5f75f 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
8686d83aa52e435c6adafbe9ff4bd1602281072a 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
eb8951fba48c335095cc43fc3672de1c733e07ff 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
fabeae3083a8ea55cdacbb9568f3847ccd85bab4 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
37ec0b79beafcf5735c386b066eb319fb697eff5 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 419541011d652becf0cda7a5e62ce813cddb1732 
  
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
e3cc1967e407b64cc734548c19e30de700b64ba8 
  core/src/main/scala/kafka/api/FetchRequest.scala 
b038c15186c0cbcc65b59479324052498361b717 
  core/src/main/scala/kafka/api/FetchResponse.scala 
75aaf57fb76ec01660d93701a57ae953d877d81c 
  core/src/main/scala/kafka/api/ProducerRequest.scala 
570b2da1d865086f9830aa919a49063abbbe574d 
  core/src/main/scala/kafka/api/ProducerResponse.scala 
5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
31a2639477bf66f9a05d2b9b07794572d7ec393b 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
a439046e118b6efcc3a5a9d9e8acb79f85e40398 
  core/src/main/scala/kafka/server/DelayedFetch.scala 
de6cf5bdaa0e70394162febc63b50b55ca0a92db 
  core/src/main/scala/kafka/server/DelayedProduce.scala 
05078b24ef28f2f4e099afa943e43f1d00359fda 
  core/src/main/scala/kafka/server/KafkaApis.scala 
417960dd1ab407ebebad8fdb0e97415db3e91a2f 
  core/src/main/scala/kafka/server/OffsetManager.scala 
18680ce100f10035175cc0263ba7787ab0f6a17a 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b31b432a226ba79546dd22ef1d2acbb439c2e9a3 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
5717165f2344823fabe8f7cfafae4bb8af2d949a 
  core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
f3ab3f4ff8eb1aa6b2ab87ba75f72eceb6649620 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
00d59337a99ac135e8689bd1ecd928f7b1423d79 

Diff: https://reviews.apache.org/r/33378/diff/


Testing
---

New tests added


Thanks,

Aditya Auradkar



Re: [DISCUSSION] Reuse o.a.k.clients.NetworkClient in controller.

2015-05-06 Thread Jiangjie Qin
Hey Guozhang,

Good point about unify the handling of MetadataResponse and other
ClientResponses.

Thanks for all the feedbacks! I’ll take a shot and upload a patch for
review.

Jiangjie (Becket) Qin

On 5/6/15, 2:19 PM, Guozhang Wang wangg...@gmail.com wrote:

Jiangjie,

Just trying to figure the class reference hierarchy of this approach (A
-
B means A either has a member variable of B or take B as API parameter).

Metadata will have interface that takes in KafkaClient as a parameter, so
Metadata - KafkaClient

1. For producer:

KafkaProducer - Sender, KafkaProducer - Metadata, Sender - KafkaClient,
Sender - Metadata

2. For consumer:

KafkaConsumer - Coordinator, KafkaConsumer - Fetcher, KafkaConsumer -
KafkaClient, KafkaConsumer - Metadata, Coordinator - KafkaClient,
Coordinator - Metadata, Fetcher - KafkaClient, Fetcher - Metadata,

3. For controller:

KafkaController - KafkaClient

4. For replica fetcher:

ReplicaFetcher - KafkaClient

For producer / consumer, the interleaving seems a bit complicated to me.
Instead, we could completely remove the concept of Metadata from
KafkaClient, such that NetworkClient.handleCompletedReceives does not
specifically handle metadata responses, but just call
response.request().callback().onComplete(response) as well, which will try
to update the metadata and check for any errors.

Guozhang

On Wed, May 6, 2015 at 10:40 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Jun  Ewen,

 Thanks a lot for the comments. Both of them look better than my original
 plan.

 Jun, one downside about not changing NetworkClient but use a different
 metadata implementation is that NetworkClient will still have all the
 metadata related code there which makes it a little bit weird. I think
 Ewen¹s approach solve this problem.

 Ewen, If I understand correctly, you are proposing something similar to
 the structure we are using in
 AbstractFetcherThread/ConsumerFetcherThread/ReplicaFetcherThread. That
 will make NetworkClient look clean but increase the layers which is
 probably OK.

 Inspired by your suggestions. I have another thought which seems closer
to
 Jun¹s idea. What if we move
maybeUpdateMetadata()/handleMetadataResponse()
 and related logic in NetworkClient to metadata and pass in NetworkClient
 as an argument. Like Jun suggested, we need a Metadata interface and
 different implementations.

 Thanks.

 Jiangjie (Becket) Qin



 On 5/5/15, 11:31 PM, Ewen Cheslack-Postava e...@confluent.io wrote:

 +1 on trying to reuse the NetworkClient code.
 
 I think Jun's approach could work, but I'm wondering if refactoring a
bit
 could get better separation of concerns without a somewhat awkward nop
 implementation of Metadata. I'm not sure what combination of
delegation or
 subclassing makes sense yet, but here's another approach that I think
 could
 work:
 
 * Get rid of metadata stuff from NetworkClient. Add a subclass that
also
 manages all the metadata. (Since it's used for both producer and
consumer,
 the obvious name that I first jumped to is ClientNetworkClient, but
 somehow
 I think we can come up with something less confusing.)
 * leastLoadedNode is the only caller of metadata.fetch() in that class,
 maybeUpdateMetadata is the only caller of leastLoadedNode,
 maybeUpdateMetadata is only called in poll when a combination of
metadata
 related timeouts end up being 0. These can be safely refactored into
the
 subclass with one override of poll(). Same with metadataFetchInProgress
 assuming the rest of the changes below.
 * Some of the default implementations (e.g. handleMetadataResponse)
can be
 left nops in NetworkClient and moved to the subclass.
 * Others can be overridden to call the super method then take the
 additional action necessary (e.g., on disconnect, move the metadata
update
 request to the subclass).
 * Making the timeout handling in poll() work for both NetworkClient and
 the
 new base class might be the messiest part and might require breaking
down
 the implementation of poll into multiple methods.
 * isReady uses metadataFetchInProgress and gets a timeout from the
 Metadata
 class. We can just override this method as well, though I feel like
 there's
 probably a cleaner solution.
 
 -Ewen
 
 
 On Tue, May 5, 2015 at 4:54 PM, Jun Rao j...@confluent.io wrote:
 
  Hi, Jiangjie,
 
  Thanks for taking on this.
 
  I was thinking that one way to decouple the dependency on Metadata in
  NetworkClient is the following.
  1. Make Metadata an interface.
  2. Rename current Metadata class to sth like KafkaMetadata that
 implements
  the Metadata interface.
  3. Have a new NoOpMetadata class that implements the Metadata
interface.
  This class
  3.1 does nothing for any write method
  3.2 returns max long for any method that asks for a timestamp
  3.3. returns an empty Cluster for fetch().
 
  Then we can leave NetworkClient unchanged and just pass in a
 NoOpMetadata
  when using NetworkClient in the controller. The consumer/producer
client
  will be using KafkaMetadata.
 

[jira] [Commented] (KAFKA-2172) Round-robin partition assignment strategy too restrictive

2015-05-06 Thread Bryan Baugher (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14530767#comment-14530767
 ] 

Bryan Baugher commented on KAFKA-2172:
--

I originally asked out on the mailing list about this[1] but I'm also having 
troubles with the round robin partitioning because of its requirements. Similar 
to the above it makes deployments difficult as when changing our topic 
subscriptions the consumer group stops consuming messages. In our case our 
consumers are building their topic subscriptions from config they retrieve 
regularly from a REST service. Every consumer should have the same topic 
subscription except when the config changes and there some lag before all 
consumers retrieve the new config.

Would you be open to a patch that provides another assignor which takes a 
simpler approach and just assigns each partition to a consumer interested in 
that topic with the least number of partitions assigned? This would not provide 
the optimal solution in the case where topic subscriptions are not equal but 
should generally do fine and should come up with the same answer as the round 
robin assignor when they are.

[1] - 
http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3CCANZ-JHE6TRf%2BHdT-%3DK9AKFVXasLjg445cmcRVEBi5tG93XTNqA%40mail.gmail.com%3E

 Round-robin partition assignment strategy too restrictive
 -

 Key: KAFKA-2172
 URL: https://issues.apache.org/jira/browse/KAFKA-2172
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg

 The round-ropin partition assignment strategy, was introduced for the 
 high-level consumer, starting with 0.8.2.1.  This appears to be a very 
 attractive feature, but it has an unfortunate restriction, which prevents it 
 from being easily utilized.  That is that it requires all consumers in the 
 consumer group have identical topic regex selectors, and that they have the 
 same number of consumer threads.
 It turns out this is not always the case for our deployments.  It's not 
 unusual to run multiple consumers within a single process (with different 
 topic selectors), or we might have multiple processes dedicated for different 
 topic subsets.  Agreed, we could change these to have separate group ids for 
 each sub topic selector (but unfortunately, that's easier said than done).  
 In several cases, we do at least have separate client.ids set for each 
 sub-consumer, so it would be incrementally better if we could at least loosen 
 the requirement such that each set of topics selected by a groupid/clientid 
 pair are the same.
 But, if we want to do a rolling restart for a new version of a consumer 
 config, the cluster will likely be in a state where it's not possible to have 
 a single config until the full rolling restart completes across all nodes.  
 This results in a consumer outage while the rolling restart is happening.
 Finally, it's especially problematic if we want to canary a new version for a 
 period before rolling to the whole cluster.
 I'm not sure why this restriction should exist (as it obviously does not 
 exist for the 'range' assignment strategy).  It seems it could be made to 
 work reasonably well with heterogenous topic selection and heterogenous 
 thread counts.  The documentation states that The round-robin partition 
 assignor lays out all the available partitions and all the available consumer 
 threads. It then proceeds to do a round-robin assignment from partition to 
 consumer thread.
 If the assignor can lay out all the available partitions and all the 
 available consumer threads, it should be able to uniformly assign partitions 
 to the available threads.  In each case, if a thread belongs to a consumer 
 that doesn't have that partition selected, just move to the next available 
 thread that does have the selection, etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)