Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Shlomi Hazan
Jun, while just a humble user, I would like to recall that it was just 6
days ago that you told me on the user list that the producer is stable when
I asked what producer to go with and if the new producer is production
stable (you can still see that email down the list).
maybe I miss something, but for me, stable includes the API.
So it looks rather too big and too late from where I am standing to make
this change now. this kind of change will introduce generics, add major
mandatory interface, and make the whole producer more complicated then it
really has to be when you consider only Kafka and not Avro.
I can see the obvious benefits for the many other use cases, but once you
declare something stable it is usually expected that the API will not
change unless something really big was discovered.
Now it may be the case that you discovered something big enough and so
personally I will not make a vote.
If the benefits make the change justifiable is for you guys to decide.
Shlomi

On Tue, Nov 25, 2014 at 6:43 AM, Sriram Subramanian 
srsubraman...@linkedin.com.invalid wrote:

 Looked at the patch. +1 from me.

 On 11/24/14 8:29 PM, Gwen Shapira gshap...@cloudera.com wrote:

 As one of the people who spent too much time building Avro repositories,
 +1
 on bringing serializer API back.
 
 I think it will make the new producer easier to work with.
 
 Gwen
 
 On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
  This is admittedly late in the release cycle to make a change. To add to
  Jun's description the motivation was that we felt it would be better to
  change that interface now rather than after the release if it needed to
  change.
 
  The motivation for wanting to make a change was the ability to really be
  able to develop support for Avro and other serialization formats. The
  current status is pretty scattered--there is a schema repository on an
 Avro
  JIRA and another fork of that on github, and a bunch of people we have
  talked to have done similar things for other serialization systems. It
  would be nice if these things could be packaged in such a way that it
 was
  possible to just change a few configs in the producer and get rich
 metadata
  support for messages.
 
  As we were thinking this through we realized that the new api we were
 about
  to introduce was kind of not very compatable with this since it was just
  byte[] oriented.
 
  You can always do this by adding some kind of wrapper api that wraps the
  producer. But this puts us back in the position of trying to document
 and
  support multiple interfaces.
 
  This also opens up the possibility of adding a MessageValidator or
  MessageInterceptor plug-in transparently so that you can do other custom
  validation on the messages you are sending which obviously requires
 access
  to the original object not the byte array.
 
  This api doesn't prevent using byte[] by configuring the
  ByteArraySerializer it works as it currently does.
 
  -Jay
 
  On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao jun...@gmail.com wrote:
 
   Hi, Everyone,
  
   I'd like to start a discussion on whether it makes sense to add the
   serializer api back to the new java producer. Currently, the new java
   producer takes a byte array for both the key and the value. While this
  api
   is simple, it pushes the serialization logic into the application.
 This
   makes it hard to reason about what type of data is being sent to Kafka
  and
   also makes it hard to share an implementation of the serializer. For
   example, to support Avro, the serialization logic could be quite
 involved
   since it might need to register the Avro schema in some remote
 registry
  and
   maintain a schema cache locally, etc. Without a serialization api,
 it's
   impossible to share such an implementation so that people can easily
  reuse.
   We sort of overlooked this implication during the initial discussion
 of
  the
   producer api.
  
   So, I'd like to propose an api change to the new producer by adding
 back
   the serializer api similar to what we had in the old producer.
 Specially,
   the proposed api changes are the following.
  
   First, we change KafkaProducer to take generic types K and V for the
 key
   and the value, respectively.
  
   public class KafkaProducerK,V implements ProducerK,V {
  
   public FutureRecordMetadata send(ProducerRecordK,V record,
  Callback
   callback);
  
   public FutureRecordMetadata send(ProducerRecordK,V record);
   }
  
   Second, we add two new configs, one for the key serializer and another
  for
   the value serializer. Both serializers will default to the byte array
   implementation.
  
   public class ProducerConfig extends AbstractConfig {
  
   .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
   org.apache.kafka.clients.producer.ByteArraySerializer,
 Importance.HIGH,
   KEY_SERIALIZER_CLASS_DOC)
   .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
   

Re: Review Request 27634: Patch for KAFKA-1667

2014-11-25 Thread Dmytro Kostiuchenko

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27634/
---

(Updated Nov. 25, 2014, 11:04 a.m.)


Review request for kafka.


Bugs: KAFKA-1667
https://issues.apache.org/jira/browse/KAFKA-1667


Repository: kafka


Description
---

KAFKA-1667 Fixed bugs in LogConfig. Added test and documentation


KAFKA-1667 Updated tests to reflect new boolean property parsing logic


KAFKA-1667 renamed methods to match naming convention


KAFKA-1667 Added unit test to cover invalid configuration case


KAFKA-1667 Strict UncleanLeaderElection property parsing


KAFKA-1667 minor non-functional re-factoring


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
c4cea2cc072f4db4ce014b63d226431d3766bef1 
  core/src/main/scala/kafka/admin/TopicCommand.scala 
0b2735e7fc42ef9894bef1997b1f06a8ebee5439 
  core/src/main/scala/kafka/log/LogConfig.scala 
e48922a97727dd0b98f3ae630ebb0af3bef2373d 
  core/src/main/scala/kafka/utils/Utils.scala 
23aefb4715b177feae1d2f83e8b910653ea10c5f 
  core/src/test/scala/kafka/log/LogConfigTest.scala PRE-CREATION 
  core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
f44568cb25edf25db857415119018fd4c9922f61 

Diff: https://reviews.apache.org/r/27634/diff/


Testing
---


Thanks,

Dmytro Kostiuchenko



Re: Review Request 27634: Patch for KAFKA-1667

2014-11-25 Thread Dmytro Kostiuchenko


 On Nov. 21, 2014, 7:46 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/log/LogConfig.scala, lines 176-177
  https://reviews.apache.org/r/27634/diff/7/?file=765431#file765431line176
 
  This is a useful genenal check for boolean type. Could we include the 
  validation in ConfigDef.parseType() when parsing the boolean type? We 
  probably want to make it case insensitive too. Then, we can make this a 
  boolean type.

We can't just change current behaviour, since it'll break code in many places, 
obviously.
I would suggest introducing BOOLEAN_STRICT type or even introducing notion of 
strategy to ConfigDef (i.e. STRICT, TOLERANT, etc.) (I prefer the first 
option). But IMO, this is out of scope for the current ticket. I would eagerly 
made these changes, but I would like to keep changes for the 1667 as local as 
possible.
Please, let me know, what's your opinion on this.
Thanks


- Dmytro


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27634/#review62608
---


On Nov. 25, 2014, 11:04 a.m., Dmytro Kostiuchenko wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27634/
 ---
 
 (Updated Nov. 25, 2014, 11:04 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1667
 https://issues.apache.org/jira/browse/KAFKA-1667
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1667 Fixed bugs in LogConfig. Added test and documentation
 
 
 KAFKA-1667 Updated tests to reflect new boolean property parsing logic
 
 
 KAFKA-1667 renamed methods to match naming convention
 
 
 KAFKA-1667 Added unit test to cover invalid configuration case
 
 
 KAFKA-1667 Strict UncleanLeaderElection property parsing
 
 
 KAFKA-1667 minor non-functional re-factoring
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
 c4cea2cc072f4db4ce014b63d226431d3766bef1 
   core/src/main/scala/kafka/admin/TopicCommand.scala 
 0b2735e7fc42ef9894bef1997b1f06a8ebee5439 
   core/src/main/scala/kafka/log/LogConfig.scala 
 e48922a97727dd0b98f3ae630ebb0af3bef2373d 
   core/src/main/scala/kafka/utils/Utils.scala 
 23aefb4715b177feae1d2f83e8b910653ea10c5f 
   core/src/test/scala/kafka/log/LogConfigTest.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
 f44568cb25edf25db857415119018fd4c9922f61 
 
 Diff: https://reviews.apache.org/r/27634/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Dmytro Kostiuchenko
 




[jira] [Updated] (KAFKA-1667) topic-level configuration not validated

2014-11-25 Thread Dmytro Kostiuchenko (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmytro Kostiuchenko updated KAFKA-1667:
---
Attachment: KAFKA-1667_2014-11-25_12:03:56.patch

  topic-level configuration not validated
 

 Key: KAFKA-1667
 URL: https://issues.apache.org/jira/browse/KAFKA-1667
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
  Labels: newbie
 Attachments: KAFKA-1667_2014-11-05_19:43:53.patch, 
 KAFKA-1667_2014-11-06_17:10:14.patch, KAFKA-1667_2014-11-07_14:28:14.patch, 
 KAFKA-1667_2014-11-12_12:49:11.patch, KAFKA-1667_2014-11-16_18:31:34.patch, 
 KAFKA-1667_2014-11-16_18:33:10.patch, KAFKA-1667_2014-11-25_12:03:56.patch


 I was able to set the configuration for a topic to these invalid values:
 {code}
 Topic:topic-config-test  PartitionCount:1ReplicationFactor:2 
 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol
 {code}
 It seems that the values are saved as long as they are the correct type, but 
 are not validated like the corresponding broker-level properties.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1667) topic-level configuration not validated

2014-11-25 Thread Dmytro Kostiuchenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224394#comment-14224394
 ] 

Dmytro Kostiuchenko commented on KAFKA-1667:


Updated reviewboard https://reviews.apache.org/r/27634/diff/
 against branch origin/trunk

  topic-level configuration not validated
 

 Key: KAFKA-1667
 URL: https://issues.apache.org/jira/browse/KAFKA-1667
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
  Labels: newbie
 Attachments: KAFKA-1667_2014-11-05_19:43:53.patch, 
 KAFKA-1667_2014-11-06_17:10:14.patch, KAFKA-1667_2014-11-07_14:28:14.patch, 
 KAFKA-1667_2014-11-12_12:49:11.patch, KAFKA-1667_2014-11-16_18:31:34.patch, 
 KAFKA-1667_2014-11-16_18:33:10.patch, KAFKA-1667_2014-11-25_12:03:56.patch


 I was able to set the configuration for a topic to these invalid values:
 {code}
 Topic:topic-config-test  PartitionCount:1ReplicationFactor:2 
 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol
 {code}
 It seems that the values are saved as long as they are the correct type, but 
 are not validated like the corresponding broker-level properties.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Jonathan Weeks
+1 on this change — APIs are forever. As much as we’d love to see 0.8.2 release 
ASAP, it is important to get this right.

-JW

 On Nov 24, 2014, at 5:58 PM, Jun Rao jun...@gmail.com wrote:
 
 Hi, Everyone,
 
 I'd like to start a discussion on whether it makes sense to add the
 serializer api back to the new java producer. Currently, the new java
 producer takes a byte array for both the key and the value. While this api
 is simple, it pushes the serialization logic into the application. This
 makes it hard to reason about what type of data is being sent to Kafka and
 also makes it hard to share an implementation of the serializer. For
 example, to support Avro, the serialization logic could be quite involved
 since it might need to register the Avro schema in some remote registry and
 maintain a schema cache locally, etc. Without a serialization api, it's
 impossible to share such an implementation so that people can easily reuse.
 We sort of overlooked this implication during the initial discussion of the
 producer api.
 
 So, I'd like to propose an api change to the new producer by adding back
 the serializer api similar to what we had in the old producer. Specially,
 the proposed api changes are the following.
 
 First, we change KafkaProducer to take generic types K and V for the key
 and the value, respectively.
 
 public class KafkaProducerK,V implements ProducerK,V {
 
public FutureRecordMetadata send(ProducerRecordK,V record, Callback
 callback);
 
public FutureRecordMetadata send(ProducerRecordK,V record);
 }
 
 Second, we add two new configs, one for the key serializer and another for
 the value serializer. Both serializers will default to the byte array
 implementation.
 
 public class ProducerConfig extends AbstractConfig {
 
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
 org.apache.kafka.clients.producer.ByteArraySerializer, Importance.HIGH,
 KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
 org.apache.kafka.clients.producer.ByteArraySerializer, Importance.HIGH,
 VALUE_SERIALIZER_CLASS_DOC);
 }
 
 Both serializers will implement the following interface.
 
 public interface SerializerT extends Configurable {
public byte[] serialize(String topic, T data, boolean isKey);
 
public void close();
 }
 
 This is more or less the same as what's in the old producer. The slight
 differences are (1) the serializer now only requires a parameter-less
 constructor; (2) the serializer has a configure() and a close() method for
 initialization and cleanup, respectively; (3) the serialize() method
 additionally takes the topic and an isKey indicator, both of which are
 useful for things like schema registration.
 
 The detailed changes are included in KAFKA-1797. For completeness, I also
 made the corresponding changes for the new java consumer api as well.
 
 Note that the proposed api changes are incompatible with what's in the
 0.8.2 branch. However, if those api changes are beneficial, it's probably
 better to include them now in the 0.8.2 release, rather than later.
 
 I'd like to discuss mainly two things in this thread.
 1. Do people feel that the proposed api changes are reasonable?
 2. Are there any concerns of including the api changes in the 0.8.2 final
 release?
 
 Thanks,
 
 Jun



Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Joe Stein
The serializer is an expected use of the producer/consumer now and think we
should continue that support in the new client. As far as breaking the API
it is why we released the 0.8.2-beta to help get through just these type of
blocking issues in a way that the community at large could be involved in
easier with a build/binaries to download and use from maven also.

+1 on the change now prior to the 0.8.2 release.

- Joe Stein


On Mon, Nov 24, 2014 at 11:43 PM, Sriram Subramanian 
srsubraman...@linkedin.com.invalid wrote:

 Looked at the patch. +1 from me.

 On 11/24/14 8:29 PM, Gwen Shapira gshap...@cloudera.com wrote:

 As one of the people who spent too much time building Avro repositories,
 +1
 on bringing serializer API back.
 
 I think it will make the new producer easier to work with.
 
 Gwen
 
 On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
  This is admittedly late in the release cycle to make a change. To add to
  Jun's description the motivation was that we felt it would be better to
  change that interface now rather than after the release if it needed to
  change.
 
  The motivation for wanting to make a change was the ability to really be
  able to develop support for Avro and other serialization formats. The
  current status is pretty scattered--there is a schema repository on an
 Avro
  JIRA and another fork of that on github, and a bunch of people we have
  talked to have done similar things for other serialization systems. It
  would be nice if these things could be packaged in such a way that it
 was
  possible to just change a few configs in the producer and get rich
 metadata
  support for messages.
 
  As we were thinking this through we realized that the new api we were
 about
  to introduce was kind of not very compatable with this since it was just
  byte[] oriented.
 
  You can always do this by adding some kind of wrapper api that wraps the
  producer. But this puts us back in the position of trying to document
 and
  support multiple interfaces.
 
  This also opens up the possibility of adding a MessageValidator or
  MessageInterceptor plug-in transparently so that you can do other custom
  validation on the messages you are sending which obviously requires
 access
  to the original object not the byte array.
 
  This api doesn't prevent using byte[] by configuring the
  ByteArraySerializer it works as it currently does.
 
  -Jay
 
  On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao jun...@gmail.com wrote:
 
   Hi, Everyone,
  
   I'd like to start a discussion on whether it makes sense to add the
   serializer api back to the new java producer. Currently, the new java
   producer takes a byte array for both the key and the value. While this
  api
   is simple, it pushes the serialization logic into the application.
 This
   makes it hard to reason about what type of data is being sent to Kafka
  and
   also makes it hard to share an implementation of the serializer. For
   example, to support Avro, the serialization logic could be quite
 involved
   since it might need to register the Avro schema in some remote
 registry
  and
   maintain a schema cache locally, etc. Without a serialization api,
 it's
   impossible to share such an implementation so that people can easily
  reuse.
   We sort of overlooked this implication during the initial discussion
 of
  the
   producer api.
  
   So, I'd like to propose an api change to the new producer by adding
 back
   the serializer api similar to what we had in the old producer.
 Specially,
   the proposed api changes are the following.
  
   First, we change KafkaProducer to take generic types K and V for the
 key
   and the value, respectively.
  
   public class KafkaProducerK,V implements ProducerK,V {
  
   public FutureRecordMetadata send(ProducerRecordK,V record,
  Callback
   callback);
  
   public FutureRecordMetadata send(ProducerRecordK,V record);
   }
  
   Second, we add two new configs, one for the key serializer and another
  for
   the value serializer. Both serializers will default to the byte array
   implementation.
  
   public class ProducerConfig extends AbstractConfig {
  
   .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
   org.apache.kafka.clients.producer.ByteArraySerializer,
 Importance.HIGH,
   KEY_SERIALIZER_CLASS_DOC)
   .define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
   org.apache.kafka.clients.producer.ByteArraySerializer,
 Importance.HIGH,
   VALUE_SERIALIZER_CLASS_DOC);
   }
  
   Both serializers will implement the following interface.
  
   public interface SerializerT extends Configurable {
   public byte[] serialize(String topic, T data, boolean isKey);
  
   public void close();
   }
  
   This is more or less the same as what's in the old producer. The
 slight
   differences are (1) the serializer now only requires a parameter-less
   constructor; (2) the serializer has a configure() and a close() method
  for
   initialization and cleanup, 

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Manikumar Reddy
+1 for this change.

what about de-serializer  class in 0.8.2?  Say i am using new producer with
Avro and old consumer combination.
then i need to give custom Decoder implementation for Avro right?.

On Tue, Nov 25, 2014 at 9:19 PM, Joe Stein joe.st...@stealth.ly wrote:

 The serializer is an expected use of the producer/consumer now and think we
 should continue that support in the new client. As far as breaking the API
 it is why we released the 0.8.2-beta to help get through just these type of
 blocking issues in a way that the community at large could be involved in
 easier with a build/binaries to download and use from maven also.

 +1 on the change now prior to the 0.8.2 release.

 - Joe Stein


 On Mon, Nov 24, 2014 at 11:43 PM, Sriram Subramanian 
 srsubraman...@linkedin.com.invalid wrote:

  Looked at the patch. +1 from me.
 
  On 11/24/14 8:29 PM, Gwen Shapira gshap...@cloudera.com wrote:
 
  As one of the people who spent too much time building Avro repositories,
  +1
  on bringing serializer API back.
  
  I think it will make the new producer easier to work with.
  
  Gwen
  
  On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps jay.kr...@gmail.com wrote:
  
   This is admittedly late in the release cycle to make a change. To add
 to
   Jun's description the motivation was that we felt it would be better
 to
   change that interface now rather than after the release if it needed
 to
   change.
  
   The motivation for wanting to make a change was the ability to really
 be
   able to develop support for Avro and other serialization formats. The
   current status is pretty scattered--there is a schema repository on an
  Avro
   JIRA and another fork of that on github, and a bunch of people we have
   talked to have done similar things for other serialization systems. It
   would be nice if these things could be packaged in such a way that it
  was
   possible to just change a few configs in the producer and get rich
  metadata
   support for messages.
  
   As we were thinking this through we realized that the new api we were
  about
   to introduce was kind of not very compatable with this since it was
 just
   byte[] oriented.
  
   You can always do this by adding some kind of wrapper api that wraps
 the
   producer. But this puts us back in the position of trying to document
  and
   support multiple interfaces.
  
   This also opens up the possibility of adding a MessageValidator or
   MessageInterceptor plug-in transparently so that you can do other
 custom
   validation on the messages you are sending which obviously requires
  access
   to the original object not the byte array.
  
   This api doesn't prevent using byte[] by configuring the
   ByteArraySerializer it works as it currently does.
  
   -Jay
  
   On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao jun...@gmail.com wrote:
  
Hi, Everyone,
   
I'd like to start a discussion on whether it makes sense to add the
serializer api back to the new java producer. Currently, the new
 java
producer takes a byte array for both the key and the value. While
 this
   api
is simple, it pushes the serialization logic into the application.
  This
makes it hard to reason about what type of data is being sent to
 Kafka
   and
also makes it hard to share an implementation of the serializer. For
example, to support Avro, the serialization logic could be quite
  involved
since it might need to register the Avro schema in some remote
  registry
   and
maintain a schema cache locally, etc. Without a serialization api,
  it's
impossible to share such an implementation so that people can easily
   reuse.
We sort of overlooked this implication during the initial discussion
  of
   the
producer api.
   
So, I'd like to propose an api change to the new producer by adding
  back
the serializer api similar to what we had in the old producer.
  Specially,
the proposed api changes are the following.
   
First, we change KafkaProducer to take generic types K and V for the
  key
and the value, respectively.
   
public class KafkaProducerK,V implements ProducerK,V {
   
public FutureRecordMetadata send(ProducerRecordK,V record,
   Callback
callback);
   
public FutureRecordMetadata send(ProducerRecordK,V record);
}
   
Second, we add two new configs, one for the key serializer and
 another
   for
the value serializer. Both serializers will default to the byte
 array
implementation.
   
public class ProducerConfig extends AbstractConfig {
   
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
org.apache.kafka.clients.producer.ByteArraySerializer,
  Importance.HIGH,
KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
org.apache.kafka.clients.producer.ByteArraySerializer,
  Importance.HIGH,
VALUE_SERIALIZER_CLASS_DOC);
}
   
Both serializers will implement the following interface.
   
public interface 

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Bhavesh Mistry
How will mix bag will work with Consumer side ?  Entire site can not be
rolled at once so Consumer will have to deals with New and Old Serialize
Bytes ?  This could be app team responsibility.  Are you guys targeting
0.8.2 release, which may break customer who are already using new producer
API (beta version).

Thanks,

Bhavesh

On Tue, Nov 25, 2014 at 8:29 AM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

 +1 for this change.

 what about de-serializer  class in 0.8.2?  Say i am using new producer with
 Avro and old consumer combination.
 then i need to give custom Decoder implementation for Avro right?.

 On Tue, Nov 25, 2014 at 9:19 PM, Joe Stein joe.st...@stealth.ly wrote:

  The serializer is an expected use of the producer/consumer now and think
 we
  should continue that support in the new client. As far as breaking the
 API
  it is why we released the 0.8.2-beta to help get through just these type
 of
  blocking issues in a way that the community at large could be involved in
  easier with a build/binaries to download and use from maven also.
 
  +1 on the change now prior to the 0.8.2 release.
 
  - Joe Stein
 
 
  On Mon, Nov 24, 2014 at 11:43 PM, Sriram Subramanian 
  srsubraman...@linkedin.com.invalid wrote:
 
   Looked at the patch. +1 from me.
  
   On 11/24/14 8:29 PM, Gwen Shapira gshap...@cloudera.com wrote:
  
   As one of the people who spent too much time building Avro
 repositories,
   +1
   on bringing serializer API back.
   
   I think it will make the new producer easier to work with.
   
   Gwen
   
   On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
   
This is admittedly late in the release cycle to make a change. To
 add
  to
Jun's description the motivation was that we felt it would be better
  to
change that interface now rather than after the release if it needed
  to
change.
   
The motivation for wanting to make a change was the ability to
 really
  be
able to develop support for Avro and other serialization formats.
 The
current status is pretty scattered--there is a schema repository on
 an
   Avro
JIRA and another fork of that on github, and a bunch of people we
 have
talked to have done similar things for other serialization systems.
 It
would be nice if these things could be packaged in such a way that
 it
   was
possible to just change a few configs in the producer and get rich
   metadata
support for messages.
   
As we were thinking this through we realized that the new api we
 were
   about
to introduce was kind of not very compatable with this since it was
  just
byte[] oriented.
   
You can always do this by adding some kind of wrapper api that wraps
  the
producer. But this puts us back in the position of trying to
 document
   and
support multiple interfaces.
   
This also opens up the possibility of adding a MessageValidator or
MessageInterceptor plug-in transparently so that you can do other
  custom
validation on the messages you are sending which obviously requires
   access
to the original object not the byte array.
   
This api doesn't prevent using byte[] by configuring the
ByteArraySerializer it works as it currently does.
   
-Jay
   
On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao jun...@gmail.com wrote:
   
 Hi, Everyone,

 I'd like to start a discussion on whether it makes sense to add
 the
 serializer api back to the new java producer. Currently, the new
  java
 producer takes a byte array for both the key and the value. While
  this
api
 is simple, it pushes the serialization logic into the application.
   This
 makes it hard to reason about what type of data is being sent to
  Kafka
and
 also makes it hard to share an implementation of the serializer.
 For
 example, to support Avro, the serialization logic could be quite
   involved
 since it might need to register the Avro schema in some remote
   registry
and
 maintain a schema cache locally, etc. Without a serialization api,
   it's
 impossible to share such an implementation so that people can
 easily
reuse.
 We sort of overlooked this implication during the initial
 discussion
   of
the
 producer api.

 So, I'd like to propose an api change to the new producer by
 adding
   back
 the serializer api similar to what we had in the old producer.
   Specially,
 the proposed api changes are the following.

 First, we change KafkaProducer to take generic types K and V for
 the
   key
 and the value, respectively.

 public class KafkaProducerK,V implements ProducerK,V {

 public FutureRecordMetadata send(ProducerRecordK,V record,
Callback
 callback);

 public FutureRecordMetadata send(ProducerRecordK,V
 record);
 }

 Second, we add two new configs, one for the key serializer and
  another
for
 the value serializer. Both 

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Jay Kreps
Hey Shlomi,

I agree that we just blew this one from a timing perspective. We ideally
should have thought this through in the original api discussion. But as we
really started to think about this area we realized that the existing api
made it really hard to provide a simple way package of serialization and
data model stuff. I have heard that there is a saying that the best time to
plant a tree is a generation ago, but the second best time is right now.
And I think this is kind of along those lines.

This was really brought home to me as for the last month or so I have been
going around and talking to a lot of people using Kafka and essentially
every one of them has had to make some kind of wrapper api. There is
nothing so terrible about these wrappers except that they make it hard to
have central documentation that explains how the system works, and they
usually strip off a lot of the functionality of the client, so you always
have to learn the in-house wrapper and can't really do everything you could
do with the main client. Since all the wrappers were trying to provide a
few things: serialization, message validation, etc. All of these depend on
having access to the original object. I think if we make this change on
serialization we can later add any additional hooks for message validation
with no compatibility problems.

-Jay

On Tue, Nov 25, 2014 at 12:12 AM, Shlomi Hazan shl...@viber.com wrote:

 Jun, while just a humble user, I would like to recall that it was just 6
 days ago that you told me on the user list that the producer is stable when
 I asked what producer to go with and if the new producer is production
 stable (you can still see that email down the list).
 maybe I miss something, but for me, stable includes the API.
 So it looks rather too big and too late from where I am standing to make
 this change now. this kind of change will introduce generics, add major
 mandatory interface, and make the whole producer more complicated then it
 really has to be when you consider only Kafka and not Avro.
 I can see the obvious benefits for the many other use cases, but once you
 declare something stable it is usually expected that the API will not
 change unless something really big was discovered.
 Now it may be the case that you discovered something big enough and so
 personally I will not make a vote.
 If the benefits make the change justifiable is for you guys to decide.
 Shlomi

 On Tue, Nov 25, 2014 at 6:43 AM, Sriram Subramanian 
 srsubraman...@linkedin.com.invalid wrote:

  Looked at the patch. +1 from me.
 
  On 11/24/14 8:29 PM, Gwen Shapira gshap...@cloudera.com wrote:
 
  As one of the people who spent too much time building Avro repositories,
  +1
  on bringing serializer API back.
  
  I think it will make the new producer easier to work with.
  
  Gwen
  
  On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps jay.kr...@gmail.com wrote:
  
   This is admittedly late in the release cycle to make a change. To add
 to
   Jun's description the motivation was that we felt it would be better
 to
   change that interface now rather than after the release if it needed
 to
   change.
  
   The motivation for wanting to make a change was the ability to really
 be
   able to develop support for Avro and other serialization formats. The
   current status is pretty scattered--there is a schema repository on an
  Avro
   JIRA and another fork of that on github, and a bunch of people we have
   talked to have done similar things for other serialization systems. It
   would be nice if these things could be packaged in such a way that it
  was
   possible to just change a few configs in the producer and get rich
  metadata
   support for messages.
  
   As we were thinking this through we realized that the new api we were
  about
   to introduce was kind of not very compatable with this since it was
 just
   byte[] oriented.
  
   You can always do this by adding some kind of wrapper api that wraps
 the
   producer. But this puts us back in the position of trying to document
  and
   support multiple interfaces.
  
   This also opens up the possibility of adding a MessageValidator or
   MessageInterceptor plug-in transparently so that you can do other
 custom
   validation on the messages you are sending which obviously requires
  access
   to the original object not the byte array.
  
   This api doesn't prevent using byte[] by configuring the
   ByteArraySerializer it works as it currently does.
  
   -Jay
  
   On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao jun...@gmail.com wrote:
  
Hi, Everyone,
   
I'd like to start a discussion on whether it makes sense to add the
serializer api back to the new java producer. Currently, the new
 java
producer takes a byte array for both the key and the value. While
 this
   api
is simple, it pushes the serialization logic into the application.
  This
makes it hard to reason about what type of data is being sent to
 Kafka
   and
also makes it hard to 

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Jun Rao
Shiomi,

Sorry, at that time, I didn't realize that we would be better off with an
api change. Yes, it sucks that we have to break the api. However, if we
have to change it, it's better to do it now rather than later.

Note that if you want to just produce byte[] to Kafka, you can still do
that with the api change. You just need to bind the producer with byte[]
and the default serializer will just work. Yes, there needs to be code
changes. My hope is that right now no one has adopted the new producer api
widely and making such code changes is not very painful yet.

Thanks,

Jun

On Tue, Nov 25, 2014 at 12:12 AM, Shlomi Hazan shl...@viber.com wrote:

 Jun, while just a humble user, I would like to recall that it was just 6
 days ago that you told me on the user list that the producer is stable when
 I asked what producer to go with and if the new producer is production
 stable (you can still see that email down the list).
 maybe I miss something, but for me, stable includes the API.
 So it looks rather too big and too late from where I am standing to make
 this change now. this kind of change will introduce generics, add major
 mandatory interface, and make the whole producer more complicated then it
 really has to be when you consider only Kafka and not Avro.
 I can see the obvious benefits for the many other use cases, but once you
 declare something stable it is usually expected that the API will not
 change unless something really big was discovered.
 Now it may be the case that you discovered something big enough and so
 personally I will not make a vote.
 If the benefits make the change justifiable is for you guys to decide.
 Shlomi

 On Tue, Nov 25, 2014 at 6:43 AM, Sriram Subramanian 
 srsubraman...@linkedin.com.invalid wrote:

  Looked at the patch. +1 from me.
 
  On 11/24/14 8:29 PM, Gwen Shapira gshap...@cloudera.com wrote:
 
  As one of the people who spent too much time building Avro repositories,
  +1
  on bringing serializer API back.
  
  I think it will make the new producer easier to work with.
  
  Gwen
  
  On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps jay.kr...@gmail.com wrote:
  
   This is admittedly late in the release cycle to make a change. To add
 to
   Jun's description the motivation was that we felt it would be better
 to
   change that interface now rather than after the release if it needed
 to
   change.
  
   The motivation for wanting to make a change was the ability to really
 be
   able to develop support for Avro and other serialization formats. The
   current status is pretty scattered--there is a schema repository on an
  Avro
   JIRA and another fork of that on github, and a bunch of people we have
   talked to have done similar things for other serialization systems. It
   would be nice if these things could be packaged in such a way that it
  was
   possible to just change a few configs in the producer and get rich
  metadata
   support for messages.
  
   As we were thinking this through we realized that the new api we were
  about
   to introduce was kind of not very compatable with this since it was
 just
   byte[] oriented.
  
   You can always do this by adding some kind of wrapper api that wraps
 the
   producer. But this puts us back in the position of trying to document
  and
   support multiple interfaces.
  
   This also opens up the possibility of adding a MessageValidator or
   MessageInterceptor plug-in transparently so that you can do other
 custom
   validation on the messages you are sending which obviously requires
  access
   to the original object not the byte array.
  
   This api doesn't prevent using byte[] by configuring the
   ByteArraySerializer it works as it currently does.
  
   -Jay
  
   On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao jun...@gmail.com wrote:
  
Hi, Everyone,
   
I'd like to start a discussion on whether it makes sense to add the
serializer api back to the new java producer. Currently, the new
 java
producer takes a byte array for both the key and the value. While
 this
   api
is simple, it pushes the serialization logic into the application.
  This
makes it hard to reason about what type of data is being sent to
 Kafka
   and
also makes it hard to share an implementation of the serializer. For
example, to support Avro, the serialization logic could be quite
  involved
since it might need to register the Avro schema in some remote
  registry
   and
maintain a schema cache locally, etc. Without a serialization api,
  it's
impossible to share such an implementation so that people can easily
   reuse.
We sort of overlooked this implication during the initial discussion
  of
   the
producer api.
   
So, I'd like to propose an api change to the new producer by adding
  back
the serializer api similar to what we had in the old producer.
  Specially,
the proposed api changes are the following.
   
First, we change KafkaProducer to take generic types K and V for the
  

Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Jun Rao
The old consumer already takes a deserializer when creating streams. So you
plug in your decoder there.

Thanks,

Jun

On Tue, Nov 25, 2014 at 8:29 AM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

 +1 for this change.

 what about de-serializer  class in 0.8.2?  Say i am using new producer with
 Avro and old consumer combination.
 then i need to give custom Decoder implementation for Avro right?.

 On Tue, Nov 25, 2014 at 9:19 PM, Joe Stein joe.st...@stealth.ly wrote:

  The serializer is an expected use of the producer/consumer now and think
 we
  should continue that support in the new client. As far as breaking the
 API
  it is why we released the 0.8.2-beta to help get through just these type
 of
  blocking issues in a way that the community at large could be involved in
  easier with a build/binaries to download and use from maven also.
 
  +1 on the change now prior to the 0.8.2 release.
 
  - Joe Stein
 
 
  On Mon, Nov 24, 2014 at 11:43 PM, Sriram Subramanian 
  srsubraman...@linkedin.com.invalid wrote:
 
   Looked at the patch. +1 from me.
  
   On 11/24/14 8:29 PM, Gwen Shapira gshap...@cloudera.com wrote:
  
   As one of the people who spent too much time building Avro
 repositories,
   +1
   on bringing serializer API back.
   
   I think it will make the new producer easier to work with.
   
   Gwen
   
   On Mon, Nov 24, 2014 at 6:13 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
   
This is admittedly late in the release cycle to make a change. To
 add
  to
Jun's description the motivation was that we felt it would be better
  to
change that interface now rather than after the release if it needed
  to
change.
   
The motivation for wanting to make a change was the ability to
 really
  be
able to develop support for Avro and other serialization formats.
 The
current status is pretty scattered--there is a schema repository on
 an
   Avro
JIRA and another fork of that on github, and a bunch of people we
 have
talked to have done similar things for other serialization systems.
 It
would be nice if these things could be packaged in such a way that
 it
   was
possible to just change a few configs in the producer and get rich
   metadata
support for messages.
   
As we were thinking this through we realized that the new api we
 were
   about
to introduce was kind of not very compatable with this since it was
  just
byte[] oriented.
   
You can always do this by adding some kind of wrapper api that wraps
  the
producer. But this puts us back in the position of trying to
 document
   and
support multiple interfaces.
   
This also opens up the possibility of adding a MessageValidator or
MessageInterceptor plug-in transparently so that you can do other
  custom
validation on the messages you are sending which obviously requires
   access
to the original object not the byte array.
   
This api doesn't prevent using byte[] by configuring the
ByteArraySerializer it works as it currently does.
   
-Jay
   
On Mon, Nov 24, 2014 at 5:58 PM, Jun Rao jun...@gmail.com wrote:
   
 Hi, Everyone,

 I'd like to start a discussion on whether it makes sense to add
 the
 serializer api back to the new java producer. Currently, the new
  java
 producer takes a byte array for both the key and the value. While
  this
api
 is simple, it pushes the serialization logic into the application.
   This
 makes it hard to reason about what type of data is being sent to
  Kafka
and
 also makes it hard to share an implementation of the serializer.
 For
 example, to support Avro, the serialization logic could be quite
   involved
 since it might need to register the Avro schema in some remote
   registry
and
 maintain a schema cache locally, etc. Without a serialization api,
   it's
 impossible to share such an implementation so that people can
 easily
reuse.
 We sort of overlooked this implication during the initial
 discussion
   of
the
 producer api.

 So, I'd like to propose an api change to the new producer by
 adding
   back
 the serializer api similar to what we had in the old producer.
   Specially,
 the proposed api changes are the following.

 First, we change KafkaProducer to take generic types K and V for
 the
   key
 and the value, respectively.

 public class KafkaProducerK,V implements ProducerK,V {

 public FutureRecordMetadata send(ProducerRecordK,V record,
Callback
 callback);

 public FutureRecordMetadata send(ProducerRecordK,V
 record);
 }

 Second, we add two new configs, one for the key serializer and
  another
for
 the value serializer. Both serializers will default to the byte
  array
 implementation.

 public class ProducerConfig extends AbstractConfig {

 .define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
 

Re: Review Request 27430: Fix KAFKA-1720

2014-11-25 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27430/#review63003
---



core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/27430/#comment105181

In Kafka-1583 we had discussed renaming ReplicaManager to 
ReplicatedLogManager or something like that.


- Joel Koshy


On Nov. 1, 2014, 12:21 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27430/
 ---
 
 (Updated Nov. 1, 2014, 12:21 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1720
 https://issues.apache.org/jira/browse/KAFKA-1720
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Rename delayed requests to delayed operations, change some class names
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/cluster/Partition.scala 
 1be57008e983fc3a831626ecf9a861f164fcca92 
   core/src/main/scala/kafka/server/DelayedFetch.scala 
 1ccbb4b6fdbbd4412ba77ffe7d4cf5adf939e439 
   core/src/main/scala/kafka/server/DelayedProduce.scala 
 8049e07e5d6d65913ec2492f1e22e5ed3ecbbea8 
   core/src/main/scala/kafka/server/DelayedRequestKey.scala 
 628ef59564b9b9238d7b05d26aef79d3cfec174d 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 3007a6d89b637b93f71fdb7adab561a93d9c4c62 
   core/src/main/scala/kafka/server/RequestPurgatory.scala 
 323b12e765f981e9bba736a204e4a8159e5c5ada 
   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
 a7720d579ea15b71511c9da0e241bd087de3674e 
   system_test/metrics.json cd3fc142176b8a3638db5230fb74f055c04c59d4 
 
 Diff: https://reviews.apache.org/r/27430/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




[jira] [Commented] (KAFKA-1789) Issue with Async producer

2014-11-25 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224907#comment-14224907
 ] 

Jiangjie Qin commented on KAFKA-1789:
-

Do you mean that the enqueue took 250ms or the entire sending took that much 
time? It looks enqueue finished within 20 ms but the send took 250ms to finish. 
Since it's an async send, there are some other settings could affect the 
sending time. 
queue.buffering.max.messages: the size of the message queue 
batch.num.messages: the max number of messages in a sending batch
Also, if you somehow have a slow broker, it could also makes the sending time 
longer.

 Issue with Async producer
 -

 Key: KAFKA-1789
 URL: https://issues.apache.org/jira/browse/KAFKA-1789
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: devendra tagare
Priority: Critical

 Hi,
 We are using an async producer to send data to a kafka cluster.The event rate 
 at peak is around 250 events/second of size 25KB each.
 In the producer code base we have added specific debug statements to capture 
 the time taken to create a producer,create a keyed message with a byte 
 payload  send the message.
 We have added the below properties to the producerConfig
 queue.enqueue.timeout.ms=20
 send.buffer.bytes=1024000
 topic.metadata.refresh.interval.ms=3
 Based on the documentation, producer.send() queues the message on the async 
 producer's queue.
 So, ideally if the queue is full then the enqueue operation should result in 
 an kafka.common.QueueFullException in 20 ms.
 The logs indicate that the enqueue operation is taking more than 20ms (takes 
 around 250ms) without throwing any exceptions.
 Is there any other property that could conflict with queue.enqueue.timeout.ms 
 which is causing this behavior ?
 Or is it possible that the queue is not full  yet the producer.send() call 
 is still taking around 200ms under peak load ?
 Also, could you suggest any other alternatives so that we can either enforce 
 a timeout or throw an exception in-case the async producer is taking more 
 than a specified amount of time.
 Regards,
 Dev



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1795) OOME - high level kafka consumer

2014-11-25 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224911#comment-14224911
 ] 

Jiangjie Qin commented on KAFKA-1795:
-

Which GC tuning are you using? Are you using G1?

 OOME - high level kafka consumer
 

 Key: KAFKA-1795
 URL: https://issues.apache.org/jira/browse/KAFKA-1795
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1, 0.9.0
Reporter: Umesh Batra
Assignee: Neha Narkhede
Priority: Blocker
  Labels: newbie

 I am using Kafka High Level Consumer, version kafka_2.8.0 - 0.8.1.1 with 
 zkclient - 0.3 and I get OOME just after 10/15 minutes, My volume test setup 
 has just one topic with 10 partitions with continuous message (size ~500KB) 
 flow and below are my configuration; 
  
 zookeeper.connect=localhost:2181,localhost:2181
 group.id=tc
 consumer.id=tc
 zookeeper.sync.time.ms=200
 zookeeper.connection.timeout.ms=1
 zookeeper.session.timeout.ms=1
 fetch.size=50
 fetch.message.max.bytes=100
 auto.commit.enable=true
 auto.commit.interval.ms=100
 auto.offset.reset=largest
 queued.max.message.chunks=1
 backoff.increment.ms=1000
 rebalance.max.retries=10
 rebalance.retries.max=10
 rebalance.backoff.ms=1
 refresh.leader.backoff.ms=2
 consumer.timeout.ms=5 
  
 Memory histogram shows 60% of memory consumed by byte[] and most of the 
 remaining by char[] and HashMap$Node. In my various tries to recover from the 
 situation. I observed metric-* thread live even after I shutdown Kafka 
 connector? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Does Kafka Producer service ?

2014-11-25 Thread Jun Rao
Could you be a bit more specific about the issue? As long as there is
protocol compatibility btw the Kafka client and the broker, upgrading the
Kafka client library should be easy, right?

Thanks,

Jun

On Mon, Nov 24, 2014 at 3:57 PM, Krishna Raj reach.krishna...@gmail.com
wrote:

 Hello Amazing Kafka Creators  User,

 I have learnt and use kafka in our Production system, so you can count my
 understanding as intermediate.

 With the statement that Kafka has solved the Scalability and Availability
 needs for a large scale message publish/subscribe system, I understand
 that having a Producer Service which sits in between the Application and
 the Producer defects the one major purpose of Kafka.

 So, my question is, How to loosely couple Kafka with my Production
 Application ?

 The reason being, I wish to do all producer code and Kafka library
 maintenance without affecting my large scale Production system. Its not an
 easy thing to buy a window to these type of changes done on a large scale
 production application :)

 Any advice on how this can be achieved(even moderately) will greatly help ?

 Thanks,
 Krishna Raj



[SECURITY DISCUSSION] Refactoring Brokers to support multiple ports

2014-11-25 Thread Gwen Shapira
Hi Everyone,

One of the pre-requisites we have for supporting multiple security
protocols (SSL, Kerberos) is to support them on separate ports.

This is done in KAFKA-1684 (The SSL Patch), but that patch addresses
several different issues - Multiple ports, enriching the channels, SSL
implementation - which makes it more challenging to review and to test.

I'd like to split this into 3 separate patches: multi-port brokers,
enriching SocketChannel, and  the actual security implementations.

Since even just adding support for multiple listeners per broker is
somewhat involved and touches multiple components, I wrote a short design
document that covers the necessary changes and the upgrade process:

https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers

Comments are more than welcome :)

If this is acceptable, hope to have a patch ready in few days.

Gwen Shapira


Re: Does Kafka Producer service ?

2014-11-25 Thread Krishna Raj
Hi Jun,

Thanks for replying back on this. Appreciated.

I do understand that the Kafka Client just needs a protocol compatibility
with the Application which is producing the messages.

To clarity a bit more:

I witnessed a scenario where a large scale website uses the Kafka Library
in their Web Application. So in this case, the Kafka libraries are tied to
the Application which are served by Web Servers.

So, When there was an issue caused by Kafka related to CPU usage, the team
wanted to do a patch. In this case, in order to do a patch, they had to
create a new WAR package and deploy again in Web Server which is a
significant effort.

I totally understand that having a layer like Logging service in between
Kafka and the Application will totally defect the purpose for Kafka.

And I would love to know your advice how best to handle these type of
maintenance.

Thanks,
Krishna Raj





On Tue, Nov 25, 2014 at 10:58 AM, Jun Rao jun...@gmail.com wrote:

 Could you be a bit more specific about the issue? As long as there is
 protocol compatibility btw the Kafka client and the broker, upgrading the
 Kafka client library should be easy, right?

 Thanks,

 Jun

 On Mon, Nov 24, 2014 at 3:57 PM, Krishna Raj reach.krishna...@gmail.com
 wrote:

 Hello Amazing Kafka Creators  User,

 I have learnt and use kafka in our Production system, so you can count my
 understanding as intermediate.

 With the statement that Kafka has solved the Scalability and
 Availability needs for a large scale message publish/subscribe system, I
 understand that having a Producer Service which sits in between the
 Application and the Producer defects the one major purpose of Kafka.

 So, my question is, How to loosely couple Kafka with my Production
 Application ?

 The reason being, I wish to do all producer code and Kafka library
 maintenance without affecting my large scale Production system. Its not an
 easy thing to buy a window to these type of changes done on a large scale
 production application :)

 Any advice on how this can be achieved(even moderately) will greatly help
 ?

 Thanks,
 Krishna Raj





[jira] [Updated] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2014-11-25 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1461:
-
Fix Version/s: 0.8.3

 Replica fetcher thread does not implement any back-off behavior
 ---

 Key: KAFKA-1461
 URL: https://issues.apache.org/jira/browse/KAFKA-1461
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Affects Versions: 0.8.1.1
Reporter: Sam Meder
Assignee: nicu marasoiu
  Labels: newbie++
 Fix For: 0.8.3


 The current replica fetcher thread will retry in a tight loop if any error 
 occurs during the fetch call. For example, we've seen cases where the fetch 
 continuously throws a connection refused exception leading to several replica 
 fetcher threads that spin in a pretty tight loop.
 To a much lesser degree this is also an issue in the consumer fetcher thread, 
 although the fact that erroring partitions are removed so a leader can be 
 re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2014-11-25 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225166#comment-14225166
 ] 

Joe Stein commented on KAFKA-1461:
--

[~n...@museglobal.ro] are you working on this patch? If not can we assign it to 
unassigned so if someone wants to jump in and fix it, sure is annoying when it 
happens (like waiting on Recovering unflushed segment ) during that time every 
replica fetching from it spews the error ERROR kafka.server.ReplicaFetcherThread

 Replica fetcher thread does not implement any back-off behavior
 ---

 Key: KAFKA-1461
 URL: https://issues.apache.org/jira/browse/KAFKA-1461
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Affects Versions: 0.8.1.1
Reporter: Sam Meder
Assignee: nicu marasoiu
  Labels: newbie++
 Fix For: 0.8.3


 The current replica fetcher thread will retry in a tight loop if any error 
 occurs during the fetch call. For example, we've seen cases where the fetch 
 continuously throws a connection refused exception leading to several replica 
 fetcher threads that spin in a pretty tight loop.
 To a much lesser degree this is also an issue in the consumer fetcher thread, 
 although the fact that erroring partitions are removed so a leader can be 
 re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1273) Brokers should make sure replica.fetch.max.bytes = message.max.bytes

2014-11-25 Thread Patrick Lucas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225215#comment-14225215
 ] 

Patrick Lucas commented on KAFKA-1273:
--

I ran into this on 0.8.1.1.

Perhaps if you override message.max.bytes in the config file it complains on 
startup, but Kafka allows you to modify this value per-topic.

I have a topic with a higher value of message.max.bytes, and it was very 
difficult to diagnose why its replicas could not stay in sync.

 Brokers should make sure replica.fetch.max.bytes = message.max.bytes
 -

 Key: KAFKA-1273
 URL: https://issues.apache.org/jira/browse/KAFKA-1273
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.0
Reporter: Dong Zhong
Assignee: Sriharsha Chintalapani
  Labels: newbie

 If message.max.bytes is larger than replica.fetch.max.bytes,followers can't 
 fetch data from the leader and will incur endless retry. And this may cause 
 high network traffic between followers and leaders.
 Brokers should make sure replica.fetch.max.bytes = message.max.bytes by 
 adding a sanity check, or throw an exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1273) Brokers should make sure replica.fetch.max.bytes = message.max.bytes

2014-11-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1273:
-

[~sriharsha] Do you want to take a look?

 Brokers should make sure replica.fetch.max.bytes = message.max.bytes
 -

 Key: KAFKA-1273
 URL: https://issues.apache.org/jira/browse/KAFKA-1273
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.0
Reporter: Dong Zhong
Assignee: Sriharsha Chintalapani
  Labels: newbie

 If message.max.bytes is larger than replica.fetch.max.bytes,followers can't 
 fetch data from the leader and will incur endless retry. And this may cause 
 high network traffic between followers and leaders.
 Brokers should make sure replica.fetch.max.bytes = message.max.bytes by 
 adding a sanity check, or throw an exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1273) Brokers should make sure replica.fetch.max.bytes = message.max.bytes

2014-11-25 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225322#comment-14225322
 ] 

Sriharsha Chintalapani commented on KAFKA-1273:
---

[~nehanarkhede] Yes. working on reproducing it. 

 Brokers should make sure replica.fetch.max.bytes = message.max.bytes
 -

 Key: KAFKA-1273
 URL: https://issues.apache.org/jira/browse/KAFKA-1273
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.0
Reporter: Dong Zhong
Assignee: Sriharsha Chintalapani
  Labels: newbie

 If message.max.bytes is larger than replica.fetch.max.bytes,followers can't 
 fetch data from the leader and will incur endless retry. And this may cause 
 high network traffic between followers and leaders.
 Brokers should make sure replica.fetch.max.bytes = message.max.bytes by 
 adding a sanity check, or throw an exception.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1798) ConfigDef.parseType() should throw exception on invalid boolean value

2014-11-25 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-1798:
--

 Summary: ConfigDef.parseType() should throw exception on invalid 
boolean value
 Key: KAFKA-1798
 URL: https://issues.apache.org/jira/browse/KAFKA-1798
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Jun Rao
 Fix For: 0.8.3


ConfigDef.parseType() currently uses Boolean.parseBoolean(trimmed) to parse 
boolean value from a String. However, it simply returns false for anything 
that's not true. It would be better if we throw an exception if the input 
string is not either true or false.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1667) topic-level configuration not validated

2014-11-25 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1667:
---
   Resolution: Fixed
Fix Version/s: 0.8.3
 Assignee: Dmytro Kostiuchenko
   Status: Resolved  (was: Patch Available)

Thanks for the patch. +1 and committed to trunk.

Also filed KAFKA-1798 for validating the string input for the boolean config 
type.

  topic-level configuration not validated
 

 Key: KAFKA-1667
 URL: https://issues.apache.org/jira/browse/KAFKA-1667
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Ryan Berdeen
Assignee: Dmytro Kostiuchenko
  Labels: newbie
 Fix For: 0.8.3

 Attachments: KAFKA-1667_2014-11-05_19:43:53.patch, 
 KAFKA-1667_2014-11-06_17:10:14.patch, KAFKA-1667_2014-11-07_14:28:14.patch, 
 KAFKA-1667_2014-11-12_12:49:11.patch, KAFKA-1667_2014-11-16_18:31:34.patch, 
 KAFKA-1667_2014-11-16_18:33:10.patch, KAFKA-1667_2014-11-25_12:03:56.patch


 I was able to set the configuration for a topic to these invalid values:
 {code}
 Topic:topic-config-test  PartitionCount:1ReplicationFactor:2 
 Configs:min.cleanable.dirty.ratio=-30.2,segment.bytes=-1,retention.ms=-12,cleanup.policy=lol
 {code}
 It seems that the values are saved as long as they are the correct type, but 
 are not validated like the corresponding broker-level properties.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1794) Make config and config defaults accessible to clients

2014-11-25 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225349#comment-14225349
 ] 

Guozhang Wang commented on KAFKA-1794:
--

One more note is that the new producer config's explanation doc field is 
defined private and hence not shown in the compiled java doc. Although they 
will be shown in the printed html table I think it is also good to have them in 
the compiled java doc.

 Make config and config defaults accessible to clients
 -

 Key: KAFKA-1794
 URL: https://issues.apache.org/jira/browse/KAFKA-1794
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Navina Ramesh

 In the new Kafka producer API, the ProducerConfig is not accessible to the 
 clients. Samza uses the ProducerConfig instance to access the defaults 
 property values, which can then be used in the various helper utils. Config 
 instance is accessible even without instantiating a Kafka producer. 
 With the new API, there is no way to instantiate a ProducerConfig as the 
 constructor is marked private. Also, it does not make the default config 
 values accessible to the client without actually instantiating a 
 KafkaProducer.
 Changes suggested:
 1. Make the ProducerConfig constructor public
 2. Make ConfigDef in ProducerConfig accessible by the client
 3. Use public static variables for kafka config default values 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1799) ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG doesn't work

2014-11-25 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-1799:
--

 Summary: ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG doesn't work
 Key: KAFKA-1799
 URL: https://issues.apache.org/jira/browse/KAFKA-1799
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao


When running the following test, we got an unknown configuration exception.

@Test
public void testMetricsReporter() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host1:123);
producerProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
org.apache.kafka.clients.producer.new-metrics-reporter);
new KafkaProducer(producerProps);
}

org.apache.kafka.common.config.ConfigException: Unknown configuration 
'org.apache.kafka.clients.producer.new-metrics-reporter'
at 
org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:60)
at 
org.apache.kafka.common.config.AbstractConfig.getClass(AbstractConfig.java:91)
at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:147)
at 
org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:105)
at 
org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:94)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1799) ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG doesn't work

2014-11-25 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1799:
---
 Priority: Blocker  (was: Major)
Affects Version/s: 0.8.2
Fix Version/s: 0.8.2

Marking this an 0.8.2 blocker.

 ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG doesn't work
 --

 Key: KAFKA-1799
 URL: https://issues.apache.org/jira/browse/KAFKA-1799
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Jun Rao
Priority: Blocker
  Labels: newbie++
 Fix For: 0.8.2


 When running the following test, we got an unknown configuration exception.
 @Test
 public void testMetricsReporter() {
 Properties producerProps = new Properties();
 producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
 host1:123);
 producerProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
 org.apache.kafka.clients.producer.new-metrics-reporter);
 new KafkaProducer(producerProps);
 }
 org.apache.kafka.common.config.ConfigException: Unknown configuration 
 'org.apache.kafka.clients.producer.new-metrics-reporter'
   at 
 org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:60)
   at 
 org.apache.kafka.common.config.AbstractConfig.getClass(AbstractConfig.java:91)
   at 
 org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:147)
   at 
 org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:105)
   at 
 org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:94)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1799) ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG doesn't work

2014-11-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225374#comment-14225374
 ] 

Jun Rao commented on KAFKA-1799:


This is because ConfigDef.parseType() assumes that if the input is a list, it's 
a list of integers. However, in ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
are expecting a list of classes. So, in 
AbstractConfig.getConfiguredInstances(), we need to explicitly convert each 
string item to a class.

 ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG doesn't work
 --

 Key: KAFKA-1799
 URL: https://issues.apache.org/jira/browse/KAFKA-1799
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie++
 Fix For: 0.8.2


 When running the following test, we got an unknown configuration exception.
 @Test
 public void testMetricsReporter() {
 Properties producerProps = new Properties();
 producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
 host1:123);
 producerProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
 org.apache.kafka.clients.producer.new-metrics-reporter);
 new KafkaProducer(producerProps);
 }
 org.apache.kafka.common.config.ConfigException: Unknown configuration 
 'org.apache.kafka.clients.producer.new-metrics-reporter'
   at 
 org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:60)
   at 
 org.apache.kafka.common.config.AbstractConfig.getClass(AbstractConfig.java:91)
   at 
 org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:147)
   at 
 org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:105)
   at 
 org.apache.kafka.clients.producer.KafkaProducer.init(KafkaProducer.java:94)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1723) make the metrics name in new producer more standard

2014-11-25 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1723:
---
 Priority: Blocker  (was: Major)
Fix Version/s: (was: 0.8.3)
   0.8.2

Since changing the metric name convention is painful, it seems that we should 
sort this out in 0.8.2. So marking this an 0.8.2 blocker.

 make the metrics name in new producer more standard
 ---

 Key: KAFKA-1723
 URL: https://issues.apache.org/jira/browse/KAFKA-1723
 Project: Kafka
  Issue Type: Sub-task
  Components: clients
Affects Versions: 0.8.2
Reporter: Jun Rao
Priority: Blocker
 Fix For: 0.8.2


 The jmx name in the new producer looks like the following:
 kafka.producer.myclientid:type=mytopic
 However, this can be ambiguous since we allow . in client id and topic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-1800) KafkaException was not recorded at the per-topic metrics

2014-11-25 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-1800:


 Summary: KafkaException was not recorded at the per-topic metrics
 Key: KAFKA-1800
 URL: https://issues.apache.org/jira/browse/KAFKA-1800
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.9.0


When KafkaException was thrown from producer.send() call, it is not recorded on 
the per-topic record-error-rate, but only the global error-rate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1772) Add an Admin message type for request response

2014-11-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225475#comment-14225475
 ] 

Jun Rao commented on KAFKA-1772:


Thanks for the patch. A few comments on the request/response format.

1. Do we need both utility and command? It's not clear whether all the 
combinations make sense. For example, what does Alter/Create producer mean? 
Would it be better just to consolidate them to a single field? It would be 
useful to document all the commands that we plan to support initially.
2. format: Do we intend to support any format other than json?
3. Could you document the format of the response as well?
4. Does the broker wait until the admin command completes before sending the 
response? Commands like partition reassignment can take a long time. 
Alternatively, the broker can send the response immediately after the command 
is issued. Then, the client can issue a describe command to check the status.

 Add an Admin message type for request response
 --

 Key: KAFKA-1772
 URL: https://issues.apache.org/jira/browse/KAFKA-1772
 Project: Kafka
  Issue Type: Sub-task
Reporter: Joe Stein
Assignee: Andrii Biletskyi
 Fix For: 0.8.3

 Attachments: KAFKA-1772.patch


 - utility int8
 - command int8
 - format int8
 - args variable length bytes
 utility 
 0 - Broker
 1 - Topic
 2 - Replication
 3 - Controller
 4 - Consumer
 5 - Producer
 Command
 0 - Create
 1 - Alter
 3 - Delete
 4 - List
 5 - Audit
 format
 0 - JSON
 args e.g. (which would equate to the data structure values == 2,1,0)
 meta-store: {
 {zookeeper:localhost:12913/kafka}
 }args: {
  partitions:
   [
 {topic: topic1, partition: 0},
 {topic: topic1, partition: 1},
 {topic: topic1, partition: 2},
  
 {topic: topic2, partition: 0},
 {topic: topic2, partition: 1},
   ]
 }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1774) REPL and Shell Client for Admin Message RQ/RP

2014-11-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225497#comment-14225497
 ] 

Jun Rao commented on KAFKA-1774:


We probably don't want the client library to drag in dependencies like JLine or 
jackson. So, we could either put the admin clients in core/kafka/tools or in a 
separate project (e.g. tools).

 REPL and Shell Client for Admin Message RQ/RP
 -

 Key: KAFKA-1774
 URL: https://issues.apache.org/jira/browse/KAFKA-1774
 Project: Kafka
  Issue Type: Sub-task
Reporter: Joe Stein
Assignee: Andrii Biletskyi
 Fix For: 0.8.3


 We should have a REPL we can work in and execute the commands with the 
 arguments. With this we can do:
 ./kafka.sh --shell 
 kafkaattach cluster -b localhost:9092;
 kafkadescribe topic sampleTopicNameForExample;
 the command line version can work like it does now so folks don't have to 
 re-write all of their tooling.
 kafka.sh --topics --everything the same like kafka-topics.sh is 
 kafka.sh --reassign --everything the same like kafka-reassign-partitions.sh 
 is 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1752) add --replace-broker option

2014-11-25 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225512#comment-14225512
 ] 

Neha Narkhede commented on KAFKA-1752:
--

[~Dmitry Pekar], It is true that we do not need to do anything special while 
decommissioning brokers, but how do users specify that a broker needs to be 
decommissioned? There needs to be a way to differentiate a temporarily failed 
broker from a decommissioned one. Wouldn't the user use an admin tool to 
specify that the broker is decommissioned which will then lead to us figuring 
out an ideal partition reassignment and executing it until the decommissioned 
broker owns nothing?

Or were you thinking about this differently?

 add --replace-broker option
 ---

 Key: KAFKA-1752
 URL: https://issues.apache.org/jira/browse/KAFKA-1752
 Project: Kafka
  Issue Type: Sub-task
  Components: tools
Reporter: Dmitry Pekar
Assignee: Dmitry Pekar
 Fix For: 0.8.3






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28108: KAFKA-1664: Kafka does not properly parse multiple ZK nodes with non-root chroot

2014-11-25 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28108/#review63083
---


Can you add a few test cases?

- Neha Narkhede


On Nov. 23, 2014, 5:37 a.m., Ashish Singh wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/28108/
 ---
 
 (Updated Nov. 23, 2014, 5:37 a.m.)
 
 
 Review request for kafka.
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1664: Kafka does not properly parse multiple ZK nodes with non-root 
 chroot
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd 
 
 Diff: https://reviews.apache.org/r/28108/diff/
 
 
 Testing
 ---
 
 Tested with and without the fix.
 
 
 Thanks,
 
 Ashish Singh
 




[jira] [Commented] (KAFKA-1790) Remote controlled shutdown was removed

2014-11-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225519#comment-14225519
 ] 

Jun Rao commented on KAFKA-1790:


The way that the controlled shutdown works now requires coordination btw the 
broker to be shut down and the controller. So, the controlled shutdown has to 
be initiated from the shutting down broker. By default, controlled shutdown is 
enabled in the broker config. So on SIGTERM, the broker will wait until all 
leaders are moved off itself before shutting itself down. Is there a particular 
reason that you can't just SIGTERM the broker for doing the controlled shutdown?

 Remote controlled shutdown was removed
 --

 Key: KAFKA-1790
 URL: https://issues.apache.org/jira/browse/KAFKA-1790
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: James Oliver
Assignee: James Oliver
Priority: Blocker
 Fix For: 0.8.2


 In core:
 kafka.admin.ShutdownBroker was removed, rendering remote controlled shutdowns 
 impossible. 
 A Kafka administrator needs to be able to perform a controlled shutdown 
 without issuing a SIGTERM/SIGKILL.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1796) Sanity check partition command line tools

2014-11-25 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225520#comment-14225520
 ] 

Neha Narkhede commented on KAFKA-1796:
--

[~guozhang] Can you add more details to the description about the specific 
improvements that you are suggesting? For example, which partition tool and 
what improvements to that tool?

 Sanity check partition command line tools
 -

 Key: KAFKA-1796
 URL: https://issues.apache.org/jira/browse/KAFKA-1796
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
 Fix For: 0.0.9


 We need to sanity check the input json has the valid values (for example, the 
 replica list does not have duplicate broker ids, etc) before triggering the 
 admin process.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1796) Sanity check partition command line tools

2014-11-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1796:
-
Labels: newbie  (was: )

 Sanity check partition command line tools
 -

 Key: KAFKA-1796
 URL: https://issues.apache.org/jira/browse/KAFKA-1796
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
  Labels: newbie
 Fix For: 0.8.3


 We need to sanity check the input json has the valid values (for example, the 
 replica list does not have duplicate broker ids, etc) before triggering the 
 admin process.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1796) Sanity check partition command line tools

2014-11-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1796:
-
Fix Version/s: (was: 0.0.9)
   0.8.3

 Sanity check partition command line tools
 -

 Key: KAFKA-1796
 URL: https://issues.apache.org/jira/browse/KAFKA-1796
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
  Labels: newbie
 Fix For: 0.8.3


 We need to sanity check the input json has the valid values (for example, the 
 replica list does not have duplicate broker ids, etc) before triggering the 
 admin process.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1751) handle broker not exists and topic not exists scenarios

2014-11-25 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225528#comment-14225528
 ] 

Neha Narkhede commented on KAFKA-1751:
--

[~Dmitry Pekar] I've reviewed your latest patch. Comments are on the rb.

 handle broker not exists and topic not exists scenarios
 ---

 Key: KAFKA-1751
 URL: https://issues.apache.org/jira/browse/KAFKA-1751
 Project: Kafka
  Issue Type: Sub-task
  Components: tools
Reporter: Dmitry Pekar
Assignee: Dmitry Pekar
 Fix For: 0.8.3

 Attachments: KAFKA-1751.patch, KAFKA-1751_2014-11-17_16:25:14.patch, 
 KAFKA-1751_2014-11-17_16:33:43.patch, KAFKA-1751_2014-11-19_11:56:57.patch, 
 kafka-1751.patch


 merged with 1750 to pass by single code review process



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1783) Missing slash in documentation for the Zookeeper paths in ZookeeperConsumerConnector

2014-11-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1783:
-
Reviewer: Guozhang Wang

[~guozhang] Assigning to you for review. Feel free to reassign.

 Missing slash in documentation for the Zookeeper paths in 
 ZookeeperConsumerConnector
 

 Key: KAFKA-1783
 URL: https://issues.apache.org/jira/browse/KAFKA-1783
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Jean-Francois Im
Assignee: Neha Narkhede
Priority: Trivial
 Attachments: kafka-missing-doc-slash.patch


 The documentation for the ZookeeperConsumerConnector refers to the consumer 
 id registry location as /consumers/[group_id]/ids[consumer_id], it should be 
 /consumers/[group_id]/ids/[consumer_id], as evidenced by 
 registerConsumerInZK() and TopicCount.scala line 61.
 A patch is provided that adds the missing forwards slash.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available

2014-11-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1788:
-
Fix Version/s: 0.8.3

 producer record can stay in RecordAccumulator forever if leader is no 
 available
 ---

 Key: KAFKA-1788
 URL: https://issues.apache.org/jira/browse/KAFKA-1788
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie++
 Fix For: 0.8.3


 In the new producer, when a partition has no leader for a long time (e.g., 
 all replicas are down), the records for that partition will stay in the 
 RecordAccumulator until the leader is available. This may cause the 
 bufferpool to be full and the callback for the produced message to block for 
 a long time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available

2014-11-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1788:
-
Component/s: producer 

 producer record can stay in RecordAccumulator forever if leader is no 
 available
 ---

 Key: KAFKA-1788
 URL: https://issues.apache.org/jira/browse/KAFKA-1788
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie++
 Fix For: 0.8.3


 In the new producer, when a partition has no leader for a long time (e.g., 
 all replicas are down), the records for that partition will stay in the 
 RecordAccumulator until the leader is available. This may cause the 
 bufferpool to be full and the callback for the produced message to block for 
 a long time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available

2014-11-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1788:
-
Affects Version/s: (was: 0.8.3)
   0.8.2

 producer record can stay in RecordAccumulator forever if leader is no 
 available
 ---

 Key: KAFKA-1788
 URL: https://issues.apache.org/jira/browse/KAFKA-1788
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie++
 Fix For: 0.8.3


 In the new producer, when a partition has no leader for a long time (e.g., 
 all replicas are down), the records for that partition will stay in the 
 RecordAccumulator until the leader is available. This may cause the 
 bufferpool to be full and the callback for the produced message to block for 
 a long time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available

2014-11-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1788:
-
Labels: newbie++  (was: )

 producer record can stay in RecordAccumulator forever if leader is no 
 available
 ---

 Key: KAFKA-1788
 URL: https://issues.apache.org/jira/browse/KAFKA-1788
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie++
 Fix For: 0.8.3


 In the new producer, when a partition has no leader for a long time (e.g., 
 all replicas are down), the records for that partition will stay in the 
 RecordAccumulator until the leader is available. This may cause the 
 bufferpool to be full and the callback for the produced message to block for 
 a long time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows

2014-11-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225536#comment-14225536
 ] 

Jun Rao commented on KAFKA-1646:


The bug fix patch forces log recovery on clean shutdown. Could that be avoided 
since it will slow down startup?

 Improve consumer read performance for Windows
 -

 Key: KAFKA-1646
 URL: https://issues.apache.org/jira/browse/KAFKA-1646
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1
 Environment: Windows
Reporter: xueqiang wang
  Labels: newbie, patch
 Attachments: Improve consumer read performance for Windows.patch, 
 KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch


 This patch is for Window platform only. In Windows platform, if there are 
 more than one replicas writing to disk, the segment log files will not be 
 consistent in disk and then consumer reading performance will be dropped down 
 greatly. This fix allocates more disk spaces when rolling a new segment, and 
 then it will improve the consumer reading performance in NTFS file system.
 This patch doesn't affect file allocation of other filesystems, for it only 
 adds statements like 'if(Os.iswindow)' or adds methods used on Windows.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1748) Decouple system test cluster resources definition from service definitions

2014-11-25 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225549#comment-14225549
 ] 

Neha Narkhede commented on KAFKA-1748:
--

[~guozhang], [~jjkoshy] Any chance someone at LI could see if these changes 
make sense and don't break the system tests?

 Decouple system test cluster resources definition from service definitions
 --

 Key: KAFKA-1748
 URL: https://issues.apache.org/jira/browse/KAFKA-1748
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
 Attachments: KAFKA-1748.patch, KAFKA-1748_2014-11-03_12:04:18.patch, 
 KAFKA-1748_2014-11-14_14:54:17.patch


 Currently the system tests use JSON files that specify the set of services 
 for each test and where they should run (i.e. hostname). These currently 
 assume that you already have SSH keys setup, use the same username on the 
 host running the tests and the test cluster, don't require any additional 
 ssh/scp/rsync flags, and assume you'll always have a fixed set of compute 
 resources (or that you'll spend a lot of time editing config files).
 While we don't want a whole cluster resource manager in the system tests, a 
 bit more flexibility would make it easier to, e.g., run tests against a local 
 vagrant cluster or on dynamically allocated EC2 instances. We can separate 
 out the basic resource spec (i.e. json specifying how to access machines) 
 from the service definition (i.e. a broker should run with settings x, y, z). 
 Restricting to a very simple set of mappings (i.e. map services to hosts with 
 round robin, optionally restricting to no reuse of hosts) should keep things 
 simple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1791) Corrupt index after safe shutdown and restart

2014-11-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225560#comment-14225560
 ] 

Jun Rao commented on KAFKA-1791:


Could you attach the .log file?

 Corrupt index after safe shutdown and restart
 -

 Key: KAFKA-1791
 URL: https://issues.apache.org/jira/browse/KAFKA-1791
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1
 Environment: Debian6 with Sun-Java6
Reporter: Vamsi Subhash Achanta
Priority: Critical
 Attachments: 0233.index


 We have 3 kafka brokers - all VMs. One of the broker was stopped for around 
 30 minutes to fix a problem with the bare metal. Upon restart, after some 
 time, the broker went out of file-descriptors (FDs) and started throwing 
 errors. Upon restart, it started throwing this corrupted index exceptions. I 
 followed the other JIRAs related to corrupted indices but the solutions 
 mentioned there (deleting the index and restart) didn't work - the index gets 
 created again. The other JIRAs solution of deleting those indexes which got 
 wrongly compacted ( 10MB) didn't work either.
 What is the error? How can I fix this and bring back the broker? Thanks.
 INFO [2014-11-21 02:57:17,510] [main][] kafka.log.LogManager - Found clean 
 shutdown file. Skipping recovery for all logs in data directory 
 '/var/lib/fk-3p-kafka/logs'
  INFO [2014-11-21 02:57:17,510] [main][] kafka.log.LogManager - Loading log 
 'kf.production.b2b.return_order.status-25'
 FATAL [2014-11-21 02:57:17,533] [main][] kafka.server.KafkaServerStartable - 
 Fatal error during KafkaServerStable startup. Prepare to shutdown
 java.lang.IllegalArgumentException: requirement failed: Corrupt index found, 
 index file 
 (/var/lib/fk-3p-kafka/logs/kf.production.b2b.return_order.status-25/0233.index)
  has non-zero size but the last offset is 233 and the base offset is 233
   at scala.Predef$.require(Predef.scala:145)
   at kafka.log.OffsetIndex.sanityCheck(OffsetIndex.scala:352)
   at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:159)
   at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:158)
   at scala.collection.Iterator$class.foreach(Iterator.scala:631)
   at 
 scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
   at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
   at 
 scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495)
   at kafka.log.Log.loadSegments(Log.scala:158)
   at kafka.log.Log.init(Log.scala:64)
   at 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118)
   at 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
   at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
   at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113)
   at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
   at kafka.log.LogManager.loadLogs(LogManager.scala:105)
   at kafka.log.LogManager.init(LogManager.scala:57)
   at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
   at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
   at 
 kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
   at kafka.Kafka$.main(Kafka.scala:46)
   at kafka.Kafka.main(Kafka.scala)
  INFO [2014-11-21 02:57:17,534] [main][] kafka.server.KafkaServer - [Kafka 
 Server 2], shutting down
  INFO [2014-11-21 02:57:17,538] [main][] kafka.server.KafkaServer - [Kafka 
 Server 2], shut down completed
  INFO [2014-11-21 02:57:17,539] [Thread-2][] kafka.server.KafkaServer - 
 [Kafka Server 2], shutting down



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1419) cross build for scala 2.11

2014-11-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225564#comment-14225564
 ] 

Jun Rao commented on KAFKA-1419:


This is being tracked in KAFKA-1054.

 cross build for scala 2.11
 --

 Key: KAFKA-1419
 URL: https://issues.apache.org/jira/browse/KAFKA-1419
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 0.8.1
Reporter: Scott Clasen
Assignee: Ivan Lyutov
Priority: Blocker
 Fix For: 0.8.1.2, 0.8.2

 Attachments: KAFKA-1419-scalaBinaryVersion.patch, 
 KAFKA-1419-scalaBinaryVersion.patch, KAFKA-1419.patch, KAFKA-1419.patch, 
 KAFKA-1419_2014-07-28_15:05:16.patch, KAFKA-1419_2014-07-29_15:13:43.patch, 
 KAFKA-1419_2014-08-04_14:43:26.patch, KAFKA-1419_2014-08-05_12:51:16.patch, 
 KAFKA-1419_2014-08-07_10:17:34.patch, KAFKA-1419_2014-08-07_10:52:18.patch, 
 KAFKA-1419_cross_build_for_scala_2_11_for_0_8_1_branch.patch


 Please publish builds for scala 2.11, hopefully just needs a small tweak to 
 the gradle conf?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 28121: Patch for KAFKA-1780

2014-11-25 Thread Ewen Cheslack-Postava


 On Nov. 25, 2014, 12:26 a.m., Neha Narkhede wrote:
  core/src/main/scala/kafka/utils/IteratorTemplate.scala, line 46
  https://reviews.apache.org/r/28121/diff/1/?file=765889#file765889line46
 
  The change is larger than I expected it to be. What is the reason we 
  couldn't change the peek() implementation and add poll to the 
  IteratorTemplate? Since IteratorTemplate is not used elsewhere, it may be 
  an ok change. 
  
  But I guess you must've thought about this and chose not to do it this 
  way. Mind explaining the reasoning?
  
  I also think this is the right way to do things. However, weighing that 
  with the fact that we are making a somewhat large change to an API that is 
  going away, how bad is implementing peek by passing in a consumer timeout 
  of 0?
 
 Ewen Cheslack-Postava wrote:
 IteratorTemplate actually is used a bunch of other places via anonymous 
 classes, although admittedly not publicly. I initially started down the path 
 of only modifying IteratorTemplate, but it ends up being a bigger, awkward 
 change since the ConsumerIterator was the only implementation that could 
 support non-blocking operations. Things get even messier if you support both 
 peek and poll since now instead of a boolean you have to support a timeout. 
 Poll is important to support if you want to accurately hit timeouts when you 
 want to collect more than one message (e.g. a proxy). You'll notice the bulk 
 of the new non-test code is in NonBlockingIteratorTemplate.poll() because it 
 doesn't decompose nicely into the existing hasNext() and next() calls, which 
 means the bulk of the patch would still be there anyway. One possibility to 
 tighten it up would be to break the original steps into checkNext, getItem, 
 and clearItem, then implement hasNext, next, peek, and poll using those even 
 small
 er pieces. But I think breaking it down too much obfuscates the code and isn't 
really worth the minor deduplication.
 
 Neha Narkhede wrote:
 Makes sense. I'd imagine the checkNext refactoring won't buy us much. 
 Curious what you think about the consumer timeout=0 hack?

That's probably a workable solution in a lot of cases -- even for the proxy 
example since you're always going to be somewhat imprecise and the network is 
going to make it much more imprecise. But I think it's much harder to work with 
since now you have to figure out what a reasonable timeout is. 1ms to try to 
hit the target timeout? Or does that add a lot of overhead if you have a slow 
stream of data? Seems easier to just do it the right way :)


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/28121/#review62909
---


On Nov. 25, 2014, 1:24 a.m., Ewen Cheslack-Postava wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/28121/
 ---
 
 (Updated Nov. 25, 2014, 1:24 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1780
 https://issues.apache.org/jira/browse/KAFKA-1780
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Expand documentation on ConsumerIterator to reflect new non-blocking APIs.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerIterator.scala 
 78fbf75651583e390258af2d9f09df6911a97b59 
   core/src/main/scala/kafka/utils/IteratorTemplate.scala 
 fd952f3ec0f04a3ba639c02779634265489fd186 
   core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 
 c0355cc0135c6af2e346b4715659353a31723b86 
   core/src/test/scala/unit/kafka/utils/IteratorTemplateTest.scala 
 46a4e899ef293c56a931bfa5bcf9a07d07ec5792 
 
 Diff: https://reviews.apache.org/r/28121/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Ewen Cheslack-Postava
 




Re: Does Kafka Producer service ?

2014-11-25 Thread Jun Rao
Assuming that you use a single producer in the async mode, the Kafka
overhead should be limited to a single thread. Using a cheaper compression
codec such as snappy will also help reducing the CPU load.

Thanks,

Jun

On Tue, Nov 25, 2014 at 12:02 PM, Krishna Raj reach.krishna...@gmail.com
wrote:

 Hi Jun,

 Thanks for replying back on this. Appreciated.

 I do understand that the Kafka Client just needs a protocol compatibility
 with the Application which is producing the messages.

 To clarity a bit more:

 I witnessed a scenario where a large scale website uses the Kafka Library
 in their Web Application. So in this case, the Kafka libraries are tied to
 the Application which are served by Web Servers.

 So, When there was an issue caused by Kafka related to CPU usage, the team
 wanted to do a patch. In this case, in order to do a patch, they had to
 create a new WAR package and deploy again in Web Server which is a
 significant effort.

 I totally understand that having a layer like Logging service in between
 Kafka and the Application will totally defect the purpose for Kafka.

 And I would love to know your advice how best to handle these type of
 maintenance.

 Thanks,
 Krishna Raj





 On Tue, Nov 25, 2014 at 10:58 AM, Jun Rao jun...@gmail.com wrote:

 Could you be a bit more specific about the issue? As long as there is
 protocol compatibility btw the Kafka client and the broker, upgrading the
 Kafka client library should be easy, right?

 Thanks,

 Jun

 On Mon, Nov 24, 2014 at 3:57 PM, Krishna Raj reach.krishna...@gmail.com
 wrote:

 Hello Amazing Kafka Creators  User,

 I have learnt and use kafka in our Production system, so you can count
 my understanding as intermediate.

 With the statement that Kafka has solved the Scalability and
 Availability needs for a large scale message publish/subscribe system, I
 understand that having a Producer Service which sits in between the
 Application and the Producer defects the one major purpose of Kafka.

 So, my question is, How to loosely couple Kafka with my Production
 Application ?

 The reason being, I wish to do all producer code and Kafka library
 maintenance without affecting my large scale Production system. Its not an
 easy thing to buy a window to these type of changes done on a large scale
 production application :)

 Any advice on how this can be achieved(even moderately) will greatly
 help ?

 Thanks,
 Krishna Raj






Re: Does Kafka Producer service ?

2014-11-25 Thread Harsha
If you want to separate your main application from producer logic you
can use or write something similar to 
https://github.com/mozilla-metrics/bagheera . Basically have a  service
that provides an REST api which your main application can call and has a
Kafkaproducer to write to kafka.
-Harsha

On Tue, Nov 25, 2014, at 06:47 PM, Jun Rao wrote:
 Assuming that you use a single producer in the async mode, the Kafka
 overhead should be limited to a single thread. Using a cheaper
 compression
 codec such as snappy will also help reducing the CPU load.
 
 Thanks,
 
 Jun
 
 On Tue, Nov 25, 2014 at 12:02 PM, Krishna Raj
 reach.krishna...@gmail.com
 wrote:
 
  Hi Jun,
 
  Thanks for replying back on this. Appreciated.
 
  I do understand that the Kafka Client just needs a protocol compatibility
  with the Application which is producing the messages.
 
  To clarity a bit more:
 
  I witnessed a scenario where a large scale website uses the Kafka Library
  in their Web Application. So in this case, the Kafka libraries are tied to
  the Application which are served by Web Servers.
 
  So, When there was an issue caused by Kafka related to CPU usage, the team
  wanted to do a patch. In this case, in order to do a patch, they had to
  create a new WAR package and deploy again in Web Server which is a
  significant effort.
 
  I totally understand that having a layer like Logging service in between
  Kafka and the Application will totally defect the purpose for Kafka.
 
  And I would love to know your advice how best to handle these type of
  maintenance.
 
  Thanks,
  Krishna Raj
 
 
 
 
 
  On Tue, Nov 25, 2014 at 10:58 AM, Jun Rao jun...@gmail.com wrote:
 
  Could you be a bit more specific about the issue? As long as there is
  protocol compatibility btw the Kafka client and the broker, upgrading the
  Kafka client library should be easy, right?
 
  Thanks,
 
  Jun
 
  On Mon, Nov 24, 2014 at 3:57 PM, Krishna Raj reach.krishna...@gmail.com
  wrote:
 
  Hello Amazing Kafka Creators  User,
 
  I have learnt and use kafka in our Production system, so you can count
  my understanding as intermediate.
 
  With the statement that Kafka has solved the Scalability and
  Availability needs for a large scale message publish/subscribe system, I
  understand that having a Producer Service which sits in between the
  Application and the Producer defects the one major purpose of Kafka.
 
  So, my question is, How to loosely couple Kafka with my Production
  Application ?
 
  The reason being, I wish to do all producer code and Kafka library
  maintenance without affecting my large scale Production system. Its not an
  easy thing to buy a window to these type of changes done on a large scale
  production application :)
 
  Any advice on how this can be achieved(even moderately) will greatly
  help ?
 
  Thanks,
  Krishna Raj
 
 
 
 


Re: Review Request 23702: Patch for KAFKA-1070

2014-11-25 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/
---

(Updated Nov. 26, 2014, 4:29 a.m.)


Review request for kafka.


Bugs: KAFKA-1070
https://issues.apache.org/jira/browse/KAFKA-1070


Repository: kafka


Description
---

KAFKA-1070. Auto-assign node id.


Diffs (updated)
-

  core/src/main/scala/kafka/common/GenerateBrokerIdException.scala PRE-CREATION 
  core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
PRE-CREATION 
  core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
6e26c5436feb4629d17f199011f3ebb674aa767f 
  core/src/main/scala/kafka/server/KafkaServer.scala 
1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
56e3e88e0cc6d917b0ffd1254e173295c1c4aabd 
  core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
PRE-CREATION 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
0da774d0ed015bdc0461b854e3540ee6e48d1838 

Diff: https://reviews.apache.org/r/23702/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Updated] (KAFKA-1070) Auto-assign node id

2014-11-25 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1070:
--
Attachment: KAFKA-1070_2014-11-25_20:29:37.patch

 Auto-assign node id
 ---

 Key: KAFKA-1070
 URL: https://issues.apache.org/jira/browse/KAFKA-1070
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps
Assignee: Sriharsha Chintalapani
  Labels: usability
 Fix For: 0.8.3

 Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, 
 KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, 
 KAFKA-1070_2014-07-24_21:05:33.patch, KAFKA-1070_2014-08-21_10:26:20.patch, 
 KAFKA-1070_2014-11-20_10:50:04.patch, KAFKA-1070_2014-11-25_20:29:37.patch


 It would be nice to have Kafka brokers auto-assign node ids rather than 
 having that be a configuration. Having a configuration is irritating because 
 (1) you have to generate a custom config for each broker and (2) even though 
 it is in configuration, changing the node id can cause all kinds of bad 
 things to happen.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1070) Auto-assign node id

2014-11-25 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225725#comment-14225725
 ] 

Sriharsha Chintalapani commented on KAFKA-1070:
---

[~nehanarkhede] Thanks for the review. Updated the patch please take a look .

 Auto-assign node id
 ---

 Key: KAFKA-1070
 URL: https://issues.apache.org/jira/browse/KAFKA-1070
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps
Assignee: Sriharsha Chintalapani
  Labels: usability
 Fix For: 0.8.3

 Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, 
 KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, 
 KAFKA-1070_2014-07-24_21:05:33.patch, KAFKA-1070_2014-08-21_10:26:20.patch, 
 KAFKA-1070_2014-11-20_10:50:04.patch, KAFKA-1070_2014-11-25_20:29:37.patch


 It would be nice to have Kafka brokers auto-assign node ids rather than 
 having that be a configuration. Having a configuration is irritating because 
 (1) you have to generate a custom config for each broker and (2) even though 
 it is in configuration, changing the node id can cause all kinds of bad 
 things to happen.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1070) Auto-assign node id

2014-11-25 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225723#comment-14225723
 ] 

Sriharsha Chintalapani commented on KAFKA-1070:
---

Updated reviewboard https://reviews.apache.org/r/23702/diff/
 against branch origin/trunk

 Auto-assign node id
 ---

 Key: KAFKA-1070
 URL: https://issues.apache.org/jira/browse/KAFKA-1070
 Project: Kafka
  Issue Type: Bug
Reporter: Jay Kreps
Assignee: Sriharsha Chintalapani
  Labels: usability
 Fix For: 0.8.3

 Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, 
 KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, 
 KAFKA-1070_2014-07-24_21:05:33.patch, KAFKA-1070_2014-08-21_10:26:20.patch, 
 KAFKA-1070_2014-11-20_10:50:04.patch, KAFKA-1070_2014-11-25_20:29:37.patch


 It would be nice to have Kafka brokers auto-assign node ids rather than 
 having that be a configuration. Having a configuration is irritating because 
 (1) you have to generate a custom config for each broker and (2) even though 
 it is in configuration, changing the node id can cause all kinds of bad 
 things to happen.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 23702: Patch for KAFKA-1070

2014-11-25 Thread Sriharsha Chintalapani


 On Nov. 23, 2014, 8:35 p.m., Neha Narkhede wrote:
  core/src/main/scala/kafka/utils/ZkUtils.scala, line 714
  https://reviews.apache.org/r/23702/diff/7/?file=771304#file771304line714
 
  Why not use persistent sequential nodes instead?

if I used createPersistentSequential it keeps creating a new node in zk with a 
sequential number. It will leave too many nodes in zk depending on the number 
of brokers that the user run. Instead I used a  zk path each time updating with 
a expected version as -1 and using returned stat.getVersion for the sequential 
number.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review62737
---


On Nov. 26, 2014, 4:29 a.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23702/
 ---
 
 (Updated Nov. 26, 2014, 4:29 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1070
 https://issues.apache.org/jira/browse/KAFKA-1070
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1070. Auto-assign node id.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/GenerateBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 6e26c5436feb4629d17f199011f3ebb674aa767f 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd 
   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 0da774d0ed015bdc0461b854e3540ee6e48d1838 
 
 Diff: https://reviews.apache.org/r/23702/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




[jira] [Commented] (KAFKA-1507) Using GetOffsetShell against non-existent topic creates the topic unintentionally

2014-11-25 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225727#comment-14225727
 ] 

Sriharsha Chintalapani commented on KAFKA-1507:
---

[~junrao]  I've seen few users request having a create topic ability in 
producer itself. I can do a up-merge and resend the patch if there is interest 
in this JIRA.

 Using GetOffsetShell against non-existent topic creates the topic 
 unintentionally
 -

 Key: KAFKA-1507
 URL: https://issues.apache.org/jira/browse/KAFKA-1507
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: centos
Reporter: Luke Forehand
Assignee: Sriharsha Chintalapani
Priority: Minor
  Labels: newbie
 Attachments: KAFKA-1507.patch, KAFKA-1507.patch, 
 KAFKA-1507_2014-07-22_10:27:45.patch, KAFKA-1507_2014-07-23_17:07:20.patch, 
 KAFKA-1507_2014-08-12_18:09:06.patch, KAFKA-1507_2014-08-22_11:06:38.patch, 
 KAFKA-1507_2014-08-22_11:08:51.patch


 A typo in using GetOffsetShell command can cause a
 topic to be created which cannot be deleted (because deletion is still in
 progress)
 ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
 kafka10:9092,kafka11:9092,kafka12:9092,kafka13:9092 --topic typo --time 1
 ./kafka-topics.sh --zookeeper stormqa1/kafka-prod --describe --topic typo
 Topic:typo  PartitionCount:8ReplicationFactor:1 Configs:
  Topic: typo Partition: 0Leader: 10  Replicas: 10
   Isr: 10
 ...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2014-11-25 Thread Nicolae Marasoiu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14225799#comment-14225799
 ] 

Nicolae Marasoiu commented on KAFKA-1461:
-

I agree to give to someone else, did not made progress yet on this

thank you

În data de marți, 25 noiembrie 2014, Joe Stein (JIRA) j...@apache.org a



 Replica fetcher thread does not implement any back-off behavior
 ---

 Key: KAFKA-1461
 URL: https://issues.apache.org/jira/browse/KAFKA-1461
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Affects Versions: 0.8.1.1
Reporter: Sam Meder
Assignee: nicu marasoiu
  Labels: newbie++
 Fix For: 0.8.3


 The current replica fetcher thread will retry in a tight loop if any error 
 occurs during the fetch call. For example, we've seen cases where the fetch 
 continuously throws a connection refused exception leading to several replica 
 fetcher threads that spin in a pretty tight loop.
 To a much lesser degree this is also an issue in the consumer fetcher thread, 
 although the fact that erroring partitions are removed so a leader can be 
 re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1791) Corrupt index after safe shutdown and restart

2014-11-25 Thread Vamsi Subhash Achanta (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vamsi Subhash Achanta updated KAFKA-1791:
-
Attachment: 0233.log

Attached the log file.

 Corrupt index after safe shutdown and restart
 -

 Key: KAFKA-1791
 URL: https://issues.apache.org/jira/browse/KAFKA-1791
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1
 Environment: Debian6 with Sun-Java6
Reporter: Vamsi Subhash Achanta
Priority: Critical
 Attachments: 0233.index, 0233.log


 We have 3 kafka brokers - all VMs. One of the broker was stopped for around 
 30 minutes to fix a problem with the bare metal. Upon restart, after some 
 time, the broker went out of file-descriptors (FDs) and started throwing 
 errors. Upon restart, it started throwing this corrupted index exceptions. I 
 followed the other JIRAs related to corrupted indices but the solutions 
 mentioned there (deleting the index and restart) didn't work - the index gets 
 created again. The other JIRAs solution of deleting those indexes which got 
 wrongly compacted ( 10MB) didn't work either.
 What is the error? How can I fix this and bring back the broker? Thanks.
 INFO [2014-11-21 02:57:17,510] [main][] kafka.log.LogManager - Found clean 
 shutdown file. Skipping recovery for all logs in data directory 
 '/var/lib/fk-3p-kafka/logs'
  INFO [2014-11-21 02:57:17,510] [main][] kafka.log.LogManager - Loading log 
 'kf.production.b2b.return_order.status-25'
 FATAL [2014-11-21 02:57:17,533] [main][] kafka.server.KafkaServerStartable - 
 Fatal error during KafkaServerStable startup. Prepare to shutdown
 java.lang.IllegalArgumentException: requirement failed: Corrupt index found, 
 index file 
 (/var/lib/fk-3p-kafka/logs/kf.production.b2b.return_order.status-25/0233.index)
  has non-zero size but the last offset is 233 and the base offset is 233
   at scala.Predef$.require(Predef.scala:145)
   at kafka.log.OffsetIndex.sanityCheck(OffsetIndex.scala:352)
   at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:159)
   at kafka.log.Log$$anonfun$loadSegments$5.apply(Log.scala:158)
   at scala.collection.Iterator$class.foreach(Iterator.scala:631)
   at 
 scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474)
   at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
   at 
 scala.collection.JavaConversions$JCollectionWrapper.foreach(JavaConversions.scala:495)
   at kafka.log.Log.loadSegments(Log.scala:158)
   at kafka.log.Log.init(Log.scala:64)
   at 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118)
   at 
 kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
   at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
   at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113)
   at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
   at kafka.log.LogManager.loadLogs(LogManager.scala:105)
   at kafka.log.LogManager.init(LogManager.scala:57)
   at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)
   at kafka.server.KafkaServer.startup(KafkaServer.scala:72)
   at 
 kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
   at kafka.Kafka$.main(Kafka.scala:46)
   at kafka.Kafka.main(Kafka.scala)
  INFO [2014-11-21 02:57:17,534] [main][] kafka.server.KafkaServer - [Kafka 
 Server 2], shutting down
  INFO [2014-11-21 02:57:17,538] [main][] kafka.server.KafkaServer - [Kafka 
 Server 2], shut down completed
  INFO [2014-11-21 02:57:17,539] [Thread-2][] kafka.server.KafkaServer - 
 [Kafka Server 2], shutting down



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)