[jira] [Commented] (KAFKA-1438) Migrate kafka client tools

2014-06-07 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14021012#comment-14021012
 ] 

Sriharsha Chintalapani commented on KAFKA-1438:
---

[~junrao] [~nehanarkhede] 
I needed to update the tools-log4j.properties I missed it in the earlier patch. 
I ran the system tests 
_test_case_name  :  testcase_0001
_test_class_name  :  ReplicaBasicTest
arg : bounce_broker  :  false
arg : broker_type  :  leader
arg : message_producing_free_time_sec  :  15
arg : num_iteration  :  1
arg : num_messages_to_produce_per_producer_call  :  50
arg : num_partition  :  1
arg : replica_factor  :  3
arg : sleep_seconds_between_producer_calls  :  1
validation_status  :
 No. of messages from consumer on [test_1] at 
simple_consumer_test_1-0_r1.log  :  25000
 No. of messages from consumer on [test_1] at 
simple_consumer_test_1-0_r2.log  :  25000
 No. of messages from consumer on [test_1] at 
simple_consumer_test_1-0_r3.log  :  25000
 Unique messages from consumer on [test_1]  :  25000
 Unique messages from producer on [test_1]  :  25000
 Validate for data matched on topic [test_1]  :  PASSED
 Validate for data matched on topic [test_1] across replicas  :  PASSED
 Validate for merged log segment checksum in cluster [source]  :  PASSED
 Validate index log in cluster [source]  :  PASSED

> Migrate kafka client tools
> --
>
> Key: KAFKA-1438
> URL: https://issues.apache.org/jira/browse/KAFKA-1438
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Sriharsha Chintalapani
>  Labels: newbie, tools, usability
> Fix For: 0.8.2
>
> Attachments: KAFKA-1438.patch, KAFKA-1438.patch, 
> KAFKA-1438_2014-05-27_11:45:29.patch, KAFKA-1438_2014-05-27_12:16:00.patch, 
> KAFKA-1438_2014-05-27_17:08:59.patch, KAFKA-1438_2014-05-28_08:32:46.patch, 
> KAFKA-1438_2014-05-28_08:36:28.patch, KAFKA-1438_2014-05-28_08:40:22.patch, 
> KAFKA-1438_2014-05-30_11:36:01.patch, KAFKA-1438_2014-05-30_11:38:46.patch, 
> KAFKA-1438_2014-05-30_11:42:32.patch
>
>
> Currently the console/perf client tools scatter across different packages, 
> we'd better to:
> 1. Move Consumer/ProducerPerformance and SimpleConsumerPerformance to tools 
> and remove the perf sub-project.
> 2. Move ConsoleConsumer from kafka.consumer to kafka.tools.
> 3. Move other consumer related tools from kafka.consumer to kafka.tools.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1438) Migrate kafka client tools

2014-06-07 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1438:
--

Attachment: KAFKA-1438.patch

> Migrate kafka client tools
> --
>
> Key: KAFKA-1438
> URL: https://issues.apache.org/jira/browse/KAFKA-1438
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Sriharsha Chintalapani
>  Labels: newbie, tools, usability
> Fix For: 0.8.2
>
> Attachments: KAFKA-1438.patch, KAFKA-1438.patch, 
> KAFKA-1438_2014-05-27_11:45:29.patch, KAFKA-1438_2014-05-27_12:16:00.patch, 
> KAFKA-1438_2014-05-27_17:08:59.patch, KAFKA-1438_2014-05-28_08:32:46.patch, 
> KAFKA-1438_2014-05-28_08:36:28.patch, KAFKA-1438_2014-05-28_08:40:22.patch, 
> KAFKA-1438_2014-05-30_11:36:01.patch, KAFKA-1438_2014-05-30_11:38:46.patch, 
> KAFKA-1438_2014-05-30_11:42:32.patch
>
>
> Currently the console/perf client tools scatter across different packages, 
> we'd better to:
> 1. Move Consumer/ProducerPerformance and SimpleConsumerPerformance to tools 
> and remove the perf sub-project.
> 2. Move ConsoleConsumer from kafka.consumer to kafka.tools.
> 3. Move other consumer related tools from kafka.consumer to kafka.tools.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1438) Migrate kafka client tools

2014-06-07 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14021010#comment-14021010
 ] 

Sriharsha Chintalapani commented on KAFKA-1438:
---

Created reviewboard https://reviews.apache.org/r/22344/diff/
 against branch origin/trunk

> Migrate kafka client tools
> --
>
> Key: KAFKA-1438
> URL: https://issues.apache.org/jira/browse/KAFKA-1438
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Sriharsha Chintalapani
>  Labels: newbie, tools, usability
> Fix For: 0.8.2
>
> Attachments: KAFKA-1438.patch, KAFKA-1438.patch, 
> KAFKA-1438_2014-05-27_11:45:29.patch, KAFKA-1438_2014-05-27_12:16:00.patch, 
> KAFKA-1438_2014-05-27_17:08:59.patch, KAFKA-1438_2014-05-28_08:32:46.patch, 
> KAFKA-1438_2014-05-28_08:36:28.patch, KAFKA-1438_2014-05-28_08:40:22.patch, 
> KAFKA-1438_2014-05-30_11:36:01.patch, KAFKA-1438_2014-05-30_11:38:46.patch, 
> KAFKA-1438_2014-05-30_11:42:32.patch
>
>
> Currently the console/perf client tools scatter across different packages, 
> we'd better to:
> 1. Move Consumer/ProducerPerformance and SimpleConsumerPerformance to tools 
> and remove the perf sub-project.
> 2. Move ConsoleConsumer from kafka.consumer to kafka.tools.
> 3. Move other consumer related tools from kafka.consumer to kafka.tools.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Review Request 22344: Patch for KAFKA-1438

2014-06-07 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22344/
---

Review request for kafka.


Bugs: KAFKA-1438
https://issues.apache.org/jira/browse/KAFKA-1438


Repository: kafka


Description
---

KAFKA-1438. Migrate kafka client tools. Fixed tools-log4j config for tools.


Diffs
-

  config/test-log4j.properties a3ae33f20e4b7cff87d8cf8368d0639b8bea73a6 

Diff: https://reviews.apache.org/r/22344/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



Re: NoReplicaOnlineException with 0.8.1.1

2014-06-07 Thread Guozhang Wang
Hi,

>From the logs it seems you created the topic immediately after starting the
broker, and hence the controller has not fully catch up. Could you try
creating the topic until the broker log stables?

Guozhang


On Fri, Jun 6, 2014 at 3:14 PM, Prakash Gowri Shankor <
prakash.shan...@gmail.com> wrote:

> Hi,
>
> I am running  0.8.1.1 with one Zookeeper and one broker. I created a
> partition 'test2' as below:
>
> /kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
> --partitions 3 --topic test2
>
> I noticed this exception in the state-change.log. Why is this occuring ?
> Should I be running more brokers if I have more than one partition ?
>
> *Exception:*
>
> kafka.common.NoReplicaOnlineException: No replica for partition [test2,2]
> is alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
>
> at
>
> kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
>
> at
>
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336)
>
> at
>
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:185)
>
> at
>
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99)
>
> at
>
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96)
>
> at
>
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>
> at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
>
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
>
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
>
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
>
> at
>
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>
> at
>
> kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96)
>
> at
>
> kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:68)
>
> at
>
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:312)
>
> at
>
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:162)
>
> at
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:63)
>
> at
>
> kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:49)
>
> at
>
> kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
>
> at
>
> kafka.server.ZookeeperLeaderElector$$anonfun$startup$1.apply(ZookeeperLeaderElector.scala:47)
>
> at kafka.utils.Utils$.inLock(Utils.scala:538)
>
> at
>
> kafka.server.ZookeeperLeaderElector.startup(ZookeeperLeaderElector.scala:47)
>
> at
>
> kafka.controller.KafkaController$$anonfun$startup$1.apply$mcV$sp(KafkaController.scala:637)
>
> at
>
> kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:633)
>
> at
>
> kafka.controller.KafkaController$$anonfun$startup$1.apply(KafkaController.scala:633)
>
> at kafka.utils.Utils$.inLock(Utils.scala:538)
>
> at
> kafka.controller.KafkaController.startup(KafkaController.scala:633)
>
> at kafka.server.KafkaServer.startup(KafkaServer.scala:96)
>
> at
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)
>
> at kafka.Kafka$.main(Kafka.scala:46)
>
> at kafka.Kafka.main(Kafka.scala)
>
>
> Thanks,
>
> Prakash
>



-- 
-- Guozhang


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-06-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14020901#comment-14020901
 ] 

Jay Kreps commented on KAFKA-1316:
--

I updated this patch to
1. Address a number of comments.
2. Get all the unit tests passing (there were a couple of bugs that caused 
tests to sporadically hang)

I think overall there are two levels of feedback here. The first is to iron out 
whether the API actually makes sense and is convenient (i.e. is what we are 
trying to do worth doing) and then figure out any additional stylistic or 
correctness issues (i.e. have we done it well).

The current approach has two layers. The network layer is in 
org.apache.kafka.clients.network and has the selector and logic for sending and 
receiving size-delimited byte arrays across a bunch of connections.

The new NetworkClient/KafkaClient layer (name could be improved) is basically 
adding on top of this several concerns:
1. Serialization
2. Cluster metadata management
3. Connection management

So let's really put some thought into seeing if we have these layers right and 
have the right apis.

> Refactor Sender
> ---
>
> Key: KAFKA-1316
> URL: https://issues.apache.org/jira/browse/KAFKA-1316
> Project: Kafka
>  Issue Type: Sub-task
>  Components: producer 
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, 
> KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch
>
>
> Currently most of the logic of the producer I/O thread is in Sender.java.
> However we will need to do a fair number of similar things in the new 
> consumer. Specifically:
>  - Track in-flight requests
>  - Fetch metadata
>  - Manage connection lifecycle
> It may be possible to refactor some of this into a helper class that can be 
> shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1316) Refactor Sender

2014-06-07 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14020899#comment-14020899
 ] 

Jay Kreps commented on KAFKA-1316:
--

Updated reviewboard https://reviews.apache.org/r/21937/
 against branch trunk

> Refactor Sender
> ---
>
> Key: KAFKA-1316
> URL: https://issues.apache.org/jira/browse/KAFKA-1316
> Project: Kafka
>  Issue Type: Sub-task
>  Components: producer 
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, 
> KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch
>
>
> Currently most of the logic of the producer I/O thread is in Sender.java.
> However we will need to do a fair number of similar things in the new 
> consumer. Specifically:
>  - Track in-flight requests
>  - Fetch metadata
>  - Manage connection lifecycle
> It may be possible to refactor some of this into a helper class that can be 
> shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1316) Refactor Sender

2014-06-07 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps updated KAFKA-1316:
-

Attachment: KAFKA-1316_2014-06-07_11:20:38.patch

> Refactor Sender
> ---
>
> Key: KAFKA-1316
> URL: https://issues.apache.org/jira/browse/KAFKA-1316
> Project: Kafka
>  Issue Type: Sub-task
>  Components: producer 
>Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1316.patch, KAFKA-1316_2014-06-03_11:15:38.patch, 
> KAFKA-1316_2014-06-03_14:33:33.patch, KAFKA-1316_2014-06-07_11:20:38.patch
>
>
> Currently most of the logic of the producer I/O thread is in Sender.java.
> However we will need to do a fair number of similar things in the new 
> consumer. Specifically:
>  - Track in-flight requests
>  - Fetch metadata
>  - Manage connection lifecycle
> It may be possible to refactor some of this into a helper class that can be 
> shared with the consumer. This will require some detailed thought.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 21937: Patch for KAFKA-1316

2014-06-07 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21937/
---

(Updated June 7, 2014, 6:20 p.m.)


Review request for kafka.


Bugs: KAFKA-1316
https://issues.apache.org/jira/browse/KAFKA-1316


Repository: kafka


Description
---

KAFKA-1316 Refactor a reusable NetworkClient interface out of Sender.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/ClientResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/KafkaClient.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
d15562a968d9e4b08f26b8d30986881adfe29e31 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 
f47a46159e9cf4161997470c6d4459cefcd0bd82 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 5ededccef7cb8e161822e9e5fc25e70a51dbe581 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 5ee5455a84446966e1e3c54d37d5f541cd30c8a3 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
9b1f5653548ba90defdae43940a5554066770b0a 
  clients/src/main/java/org/apache/kafka/common/metrics/Measurable.java 
7c2e33c3bf3b7acb6236553b9f6fd00881ffdf75 
  clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java 
c9963cb68ee227be7e6618a82b53714364f11531 
  clients/src/main/java/org/apache/kafka/common/metrics/stats/Count.java 
efcd61bca983788c6132da5b23be847205f9421d 
  clients/src/main/java/org/apache/kafka/common/metrics/stats/Max.java 
c492c3811cfab21bbd0dcb36904acdaf6a17b63f 
  clients/src/main/java/org/apache/kafka/common/metrics/stats/Min.java 
bd0919cbec5bac9329e695480b57ad721eacfe51 
  clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java 
8300978eaef59d19be70453b84d8b9bfb589f4eb 
  clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java 
4b481a54a4d532bc4ed50d30d986342eec6ef5e4 
  clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java 
0d4056fbc6ec98e1ef7f467c90340cb7c8711090 
  clients/src/main/java/org/apache/kafka/common/metrics/stats/Total.java 
53dd3d5c2a0743111898d0483dc05f0cbb6306fc 
  clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
63504246320473a2eb5c300ffe73c433f7946a25 
  clients/src/main/java/org/apache/kafka/common/network/Selector.java 
3e358985ed72a894a71d683acc7460695d6f2056 
  clients/src/main/java/org/apache/kafka/common/network/Send.java 
d62dff9433f5098ae4662d98524efbaf9f57e62e 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java 
68b8827f3bdd64580e1b443fce5b8c63152dd94a 
  clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
428968cd38a7b12991f87868bf759926ff7e594e 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
6fa4a58f5f9792776a647e8f682d7faadc0d1556 
  clients/src/test/java/org/apache/kafka/clients/MockClient.java PRE-CREATION 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
 c4072ae90fb58101a67f83054fbe0b8349e71c2e 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
3ef692ca3e9fb83868a9da6f30c0705bb3d0aed2 
  clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 
e4e0a049740d317efa9a5a43e1006e4fc49d817e 
  clients/src/test/java/org/apache/kafka/common/utils/MockTime.java 
cda8e644587aad9a8c9c96c222edc1ba27de1fb0 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
cd4ca2fa77763b090c6ad4ba4a5d46a6a8b76698 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
4da0f2c245f75ff0dcab4ecf0af085ab9f8da1bb 

Diff: https://reviews.apache.org/r/21937/diff/


Testing
---


Thanks,

Jay Kreps



Re: Review Request 21937: Patch for KAFKA-1316

2014-06-07 Thread Jay Kreps


> On June 6, 2014, 3:18 a.m., Neha Narkhede wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java,
> >  line 58
> > 
> >
> > Now we will end up with potentially two Senders - one for the 
> > producer's state machine and another for the consumer's state machine. Can 
> > we rename this one to sth like ProduerSender?
> 
> Guozhang Wang wrote:
> I think this is fine: this Sender is under 
> kafka.clients.producer.internal, the consumer Sender will be under 
> consumer.internal and there will not likely classes we need to import both.

I think the naming shouldn't conflict, though perhaps that name could be 
improved. The sender is a thread that sends (AKA produces data). The consumer 
won't have such a thread and it won't be sending (it will be receiving!).


> On June 6, 2014, 3:18 a.m., Neha Narkhede wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java,
> >  line 179
> > 
> >
> > Should we call client.close() here to close the selector?

Nice catch!


> On June 6, 2014, 3:18 a.m., Neha Narkhede wrote:
> > clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java,
> >  line 47
> > 
> >
> > This API is unused. Can we remove it?

We can but eventually each of these requests needs to go from struct=>object 
and back to handle both client and server...


> On June 6, 2014, 3:18 a.m., Neha Narkhede wrote:
> > clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java,
> >  line 69
> > 
> >
> > This API seems unused as well

Those two apis are meant to be used together

val r = new ProduceResponse()
r.addResponse(...)


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21937/#review44850
---


On June 3, 2014, 9:33 p.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21937/
> ---
> 
> (Updated June 3, 2014, 9:33 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1316
> https://issues.apache.org/jira/browse/KAFKA-1316
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1316 Refactor a reusable NetworkClient interface out of Sender.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ClientResponse.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> d15562a968d9e4b08f26b8d30986881adfe29e31 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 9b1f5653548ba90defdae43940a5554066770b0a 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> 3e358985ed72a894a71d683acc7460695d6f2056 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java 
> 68b8827f3bdd64580e1b443fce5b8c63152dd94a 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 428968cd38a7b12991f87868bf759926ff7e594e 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
> 6fa4a58f5f9792776a647e8f682d7faadc0d1556 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  c4072ae90fb58101a67f83054fbe0b8349e71c2e 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
> 3ef692ca3e9fb83868a9da6f30c0705bb3d0aed2 
>   clients/src/test/java/org/apache/kafka/common/utils/MockTime.java 
> cda8e644587aad9a8c9c96c222edc1ba27de1fb0 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> cd4ca2fa77763b090c6ad4ba4a5d46a6a8b76698 
>

Re: Review Request 21937: Patch for KAFKA-1316

2014-06-07 Thread Jay Kreps


> On June 6, 2014, 5:14 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 93
> > 
> >
> > Maybe rename to getReady()?

It isn't a getter. It actually is something more like

boolean becomeReady(node)

I think of the usage in the same way you would say "Ready the lifeboats!".

Alternately we could break this up and have it be something like
if(client.isReady(node))
  client.poll(...)
else
  client.makeReady(node)

I guess the only argument for the current approach is that there isn't much 
point in checking readiness unless you also intend to make it ready...


> On June 6, 2014, 5:14 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 158
> > 
> >
> > numInFlightRequests()?

Changed to inFlightRequestCount(), good?


> On June 6, 2014, 5:14 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java,
> >  line 490
> > 
> >
> > I made now to nowMs since we have both MilliSec and NanoSec in the 
> > producer used in metrics, and I felt it is better to indicate that which 
> > values are for Milli and which are for Nano.

Yeah that makes sense. We have always had the convention on the server-side as 
well as other projects of using now to be the current time in milliseconds. So 
I had kept that and differentiated nowNs. I think it is kind of nice to have a 
short three letter variable for the current time. Either way is probably fine 
but it should probably be consistent...


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21937/#review44921
---


On June 3, 2014, 9:33 p.m., Jay Kreps wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21937/
> ---
> 
> (Updated June 3, 2014, 9:33 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1316
> https://issues.apache.org/jira/browse/KAFKA-1316
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1316 Refactor a reusable NetworkClient interface out of Sender.
> 
> 
> Diffs
> -
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ClientResponse.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> d15562a968d9e4b08f26b8d30986881adfe29e31 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 9b1f5653548ba90defdae43940a5554066770b0a 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> 3e358985ed72a894a71d683acc7460695d6f2056 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java 
> 68b8827f3bdd64580e1b443fce5b8c63152dd94a 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 428968cd38a7b12991f87868bf759926ff7e594e 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
> 6fa4a58f5f9792776a647e8f682d7faadc0d1556 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
>  c4072ae90fb58101a67f83054fbe0b8349e71c2e 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
> 3ef692ca3e9fb83868a9da6f30c0705bb3d0aed2 
>   clients/src/test/java/org/apache/kafka/common/utils/MockTime.java 
> cda8e644587aad9a8c9c96c222edc1ba27de1fb0 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> cd4ca2fa77763b090c6ad4ba4a5d46a6a8b76698 
> 
> Diff: https://reviews.apache.org/r/21937/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>



[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2014-06-07 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14020857#comment-14020857
 ] 

Sriharsha Chintalapani commented on KAFKA-1382:
---

Updated reviewboard https://reviews.apache.org/r/21899/diff/
 against branch origin/trunk

> Update zkVersion on partition state update failures
> ---
>
> Key: KAFKA-1382
> URL: https://issues.apache.org/jira/browse/KAFKA-1382
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Sriharsha Chintalapani
> Fix For: 0.8.2
>
> Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
> KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
> KAFKA-1382_2014-06-07_09:00:56.patch
>
>
> Our updateIsr code is currently:
>   private def updateIsr(newIsr: Set[Replica]) {
> debug("Updated ISR for partition [%s,%d] to %s".format(topic, 
> partitionId, newIsr.mkString(",")))
> val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
> newIsr.map(r => r.brokerId).toList, zkVersion)
> // use the epoch of the controller that made the leadership decision, 
> instead of the current controller epoch
> val (updateSucceeded, newVersion) = 
> ZkUtils.conditionalUpdatePersistentPath(zkClient,
>   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
>   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
> if (updateSucceeded){
>   inSyncReplicas = newIsr
>   zkVersion = newVersion
>   trace("ISR updated to [%s] and zkVersion updated to 
> [%d]".format(newIsr.mkString(","), zkVersion))
> } else {
>   info("Cached zkVersion [%d] not equal to that in zookeeper, skip 
> updating ISR".format(zkVersion))
> }
> We encountered an interesting scenario recently when a large producer fully
> saturated the broker's NIC for over an hour. The large volume of data led to
> a number of ISR shrinks (and subsequent expands). The NIC saturation
> affected the zookeeper client heartbeats and led to a session timeout. The
> timeline was roughly as follows:
> - Attempt to expand ISR
> - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
> - Session timeout after around 13 seconds (the configured timeout is 20
>   seconds) so that lines up.
> - zkclient reconnects to zookeeper (with the same session ID) and retries
>   the write - but uses the old zkVersion. This fails because the zkVersion
>   has already been updated (above).
> - The ISR expand keeps failing after that and the only way to get out of it
>   is to bounce the broker.
> In the above code, if the zkVersion is different we should probably update
> the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1382) Update zkVersion on partition state update failures

2014-06-07 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1382:
--

Attachment: KAFKA-1382_2014-06-07_09:00:56.patch

> Update zkVersion on partition state update failures
> ---
>
> Key: KAFKA-1382
> URL: https://issues.apache.org/jira/browse/KAFKA-1382
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Sriharsha Chintalapani
> Fix For: 0.8.2
>
> Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
> KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
> KAFKA-1382_2014-06-07_09:00:56.patch
>
>
> Our updateIsr code is currently:
>   private def updateIsr(newIsr: Set[Replica]) {
> debug("Updated ISR for partition [%s,%d] to %s".format(topic, 
> partitionId, newIsr.mkString(",")))
> val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
> newIsr.map(r => r.brokerId).toList, zkVersion)
> // use the epoch of the controller that made the leadership decision, 
> instead of the current controller epoch
> val (updateSucceeded, newVersion) = 
> ZkUtils.conditionalUpdatePersistentPath(zkClient,
>   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
>   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
> if (updateSucceeded){
>   inSyncReplicas = newIsr
>   zkVersion = newVersion
>   trace("ISR updated to [%s] and zkVersion updated to 
> [%d]".format(newIsr.mkString(","), zkVersion))
> } else {
>   info("Cached zkVersion [%d] not equal to that in zookeeper, skip 
> updating ISR".format(zkVersion))
> }
> We encountered an interesting scenario recently when a large producer fully
> saturated the broker's NIC for over an hour. The large volume of data led to
> a number of ISR shrinks (and subsequent expands). The NIC saturation
> affected the zookeeper client heartbeats and led to a session timeout. The
> timeline was roughly as follows:
> - Attempt to expand ISR
> - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
> - Session timeout after around 13 seconds (the configured timeout is 20
>   seconds) so that lines up.
> - zkclient reconnects to zookeeper (with the same session ID) and retries
>   the write - but uses the old zkVersion. This fails because the zkVersion
>   has already been updated (above).
> - The ISR expand keeps failing after that and the only way to get out of it
>   is to bounce the broker.
> In the above code, if the zkVersion is different we should probably update
> the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 21899: Patch for KAFKA-1382

2014-06-07 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21899/
---

(Updated June 7, 2014, 4 p.m.)


Review request for kafka.


Bugs: KAFKA-1382
https://issues.apache.org/jira/browse/KAFKA-1382


Repository: kafka


Description (updated)
---

KAFKA-1382. Update zkVersion on partition state update failures.


KAFKA-1382. Update zkVersion on partition state update failures.


KAFKA-1382. Update zkVersion on partition state update failures.


KAFKA-1382. Update zkVersion on partition state update failures. added unit 
tests for ReplicationUtils


KAFKA-1382. Update zkVersion on partition state update failures. added unit 
tests for ReplicationUtils


KAFKA-1382. Update zkVersion on partition state update failures. added unit 
tests for ReplicationUtils.


Diffs (updated)
-

  core/src/main/scala/kafka/cluster/Partition.scala 
518d2df5ae702d8c0937e1f9603fd11a54e24be8 
  core/src/main/scala/kafka/controller/KafkaController.scala 
e776423b8a38da6f08b2262c8141abf2064d37d2 
  core/src/main/scala/kafka/controller/PartitionStateMachine.scala 
6457b56340a1b5440b07612f69dcffe4b051f919 
  core/src/main/scala/kafka/utils/ReplicationUtils.scala PRE-CREATION 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
fcbe269b6057b45793ea95f357890d5d6922e8d4 
  core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala PRE-CREATION 

Diff: https://reviews.apache.org/r/21899/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Commented] (KAFKA-1362) Publish sources and javadoc jars

2014-06-07 Thread Daniel Wegener (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14020794#comment-14020794
 ] 

Daniel Wegener commented on KAFKA-1362:
---

kafka_2.10-0.8.1.1-sources.jar in maven central ist empty (see 
http://search.maven.org/#artifactdetails%7Corg.apache.kafka%7Ckafka_2.10%7C0.8.1.1%7Cjar).
 Is this intentional?

> Publish sources and javadoc jars
> 
>
> Key: KAFKA-1362
> URL: https://issues.apache.org/jira/browse/KAFKA-1362
> Project: Kafka
>  Issue Type: Bug
>  Components: packaging
>Affects Versions: 0.8.1
>Reporter: Stevo Slavic
>Assignee: Joel Koshy
>  Labels: build
> Fix For: 0.8.1.1
>
> Attachments: KAFKA-1362.patch
>
>
> Currently just binaries jars get published on Maven Central (see 
> http://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.1/ ). Please 
> also publish sources and javadoc jars.



--
This message was sent by Atlassian JIRA
(v6.2#6252)