Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Andrii Biletskyi
Gwen,

Thanks for bringing this up!
Regarding UpdateMetadata in KIP-4 - no it shouldn't be used in Admin CLI,
its internal server message. We will probably use TMR there (depends how
generic re-routing facility goes) but TMR is already used in NetworkClient,
so
I believe there are no doubts about it, it should be ported to java.

Thanks,
Andrii Biletskyi


On Wed, Mar 18, 2015 at 3:13 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 I think those two requests are only used by controller to broker
 communication. Not sure if client side will need them in KIP-4, unlikely I
 guess.

 Jiangjie (Becket) Qin

 On 3/17/15, 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi,
 
 I'm starting this thread for the various questions I run into while
 refactoring the server to use client requests and responses.
 
 Help is appreciated :)
 
 First question: LEADER_AND_ISR request and STOP_REPLICA request are
 unimplemented in the client.
 
 Do we want to implement them as part of this refactoring?
 Or should we continue using the scala implementation for those?
 
 Gwen




[jira] [Commented] (KAFKA-1858) Make ServerShutdownTest a bit less flaky

2015-03-18 Thread Adrian Preston (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14366975#comment-14366975
 ] 

Adrian Preston commented on KAFKA-1858:
---

Created reviewboard https://reviews.apache.org/r/32199/diff/
 against branch origin/trunk

 Make ServerShutdownTest a bit less flaky
 

 Key: KAFKA-1858
 URL: https://issues.apache.org/jira/browse/KAFKA-1858
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
 Attachments: KAFKA-1858.patch


 ServerShutdownTest currently:
 * Starts a KafkaServer
 * Does stuff
 * Stops the server
 * Counts if there are any live kafka threads
 This is fine on its own. But when running in a test suite (i.e gradle test), 
 the test is very very sensitive to any other test freeing all resources. If 
 you start a server in a previous test and forgot to close it, the 
 ServerShutdownTest will find threads from the previous test and fail.
 This makes for a flaky test that is pretty challenging to troubleshoot.
 I suggest counting the threads at the beginning and end of each test in the 
 class, and only failing if the number at the end is greater than the number 
 at the beginning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1858) Make ServerShutdownTest a bit less flaky

2015-03-18 Thread Adrian Preston (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adrian Preston updated KAFKA-1858:
--
Attachment: KAFKA-1858.patch

 Make ServerShutdownTest a bit less flaky
 

 Key: KAFKA-1858
 URL: https://issues.apache.org/jira/browse/KAFKA-1858
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
 Attachments: KAFKA-1858.patch


 ServerShutdownTest currently:
 * Starts a KafkaServer
 * Does stuff
 * Stops the server
 * Counts if there are any live kafka threads
 This is fine on its own. But when running in a test suite (i.e gradle test), 
 the test is very very sensitive to any other test freeing all resources. If 
 you start a server in a previous test and forgot to close it, the 
 ServerShutdownTest will find threads from the previous test and fail.
 This makes for a flaky test that is pretty challenging to troubleshoot.
 I suggest counting the threads at the beginning and end of each test in the 
 class, and only failing if the number at the end is greater than the number 
 at the beginning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32199: Patch for KAFKA-1858

2015-03-18 Thread Adrian Preston

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32199/
---

Review request for kafka.


Bugs: KAFKA-1858
https://issues.apache.org/jira/browse/KAFKA-1858


Repository: kafka


Description
---

KAFKA-1858. Better checking for threads being cleaned up


Diffs
-

  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
b46daa436231d5aa5c1e2992fd5c2d9a73a30c80 

Diff: https://reviews.apache.org/r/32199/diff/


Testing
---


Thanks,

Adrian Preston



[jira] [Commented] (KAFKA-1858) Make ServerShutdownTest a bit less flaky

2015-03-18 Thread Adrian Preston (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14366985#comment-14366985
 ] 

Adrian Preston commented on KAFKA-1858:
---

As per the description in the bug report - the patch tightens up checking by 
comparing the threads running when a test starts with the tests running when 
the test completes.  If any new Kafka threads are found, the test is failed 
with an AssertionFailedError containing a list of the new threads.

 Make ServerShutdownTest a bit less flaky
 

 Key: KAFKA-1858
 URL: https://issues.apache.org/jira/browse/KAFKA-1858
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
 Attachments: KAFKA-1858.patch


 ServerShutdownTest currently:
 * Starts a KafkaServer
 * Does stuff
 * Stops the server
 * Counts if there are any live kafka threads
 This is fine on its own. But when running in a test suite (i.e gradle test), 
 the test is very very sensitive to any other test freeing all resources. If 
 you start a server in a previous test and forgot to close it, the 
 ServerShutdownTest will find threads from the previous test and fail.
 This makes for a flaky test that is pretty challenging to troubleshoot.
 I suggest counting the threads at the beginning and end of each test in the 
 class, and only failing if the number at the end is greater than the number 
 at the beginning.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1759) transient unit test failure in PartitionAssignorTest

2015-03-18 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367063#comment-14367063
 ] 

Rajini Sivaram commented on KAFKA-1759:
---

This has been fixed under [KAFKA-1823]

 transient unit test failure in PartitionAssignorTest
 

 Key: KAFKA-1759
 URL: https://issues.apache.org/jira/browse/KAFKA-1759
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao
 Fix For: 0.8.3


 Saw the following transient failure.
 unit.kafka.consumer.PartitionAssignorTest  testRoundRobinPartitionAssignor 
 FAILED
 java.lang.UnsupportedOperationException: empty.max
 at 
 scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216)
 at scala.collection.AbstractIterator.max(Iterator.scala:1157)
 at 
 unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:190)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at 
 unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2019) RoundRobinAssignor clusters by consumer

2015-03-18 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367189#comment-14367189
 ] 

Jiangjie Qin commented on KAFKA-2019:
-

Yeah, you are right. I somehow thought the partition will be hashed to a 
particular thread. Yes, hash the thread id will have no worse balance than 
current round robin. Currently we are hashing over topic partition, maybe 
changing it to only hash thread id is enough, so we don't need to hash both.

 RoundRobinAssignor clusters by consumer
 ---

 Key: KAFKA-2019
 URL: https://issues.apache.org/jira/browse/KAFKA-2019
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Joseph Holsten
Assignee: Neha Narkhede
Priority: Minor
 Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, 
 KAFKA-2019.patch


 When rolling out a change today, I noticed that some of my consumers are 
 greedy, taking far more partitions than others.
 The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds 
 sorted by toString, which is {{ %s-%d.format(consumer, threadId)}}. This 
 causes each consumer's threads to be adjacent to each other.
 One possible fix would be to define ConsumerThreadId.hashCode, and sort by 
 that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-03-18 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367275#comment-14367275
 ] 

Jiangjie Qin commented on KAFKA-2029:
-

+1 on this in general.  Current design has several issues including data loss 
during leader migration if acks is not set to -1.
In long term it is definitely worth refactoring the controller. Though I still 
think adding a timeout for putting message into controller-to-broker queue is 
dangerous and could lead to inconsistent states among brokers. We can discuss 
the solution to the problem you mentioned:
For 1) and 2), they seem to be introduced by setting the controller-to-broker 
message queue size back to bounded.
For 3), I checked the latest trunk, shutdown thread waits for auto leader 
rebalance thread outside of controller lock. Does this problem still stand?
For 4) and 5), good point and +1.
Personally, I think prioritizing the controller to broker message is a simpler 
approach to achieve the goal of fast leader migration and mitigate data loss. 
4) and 5) are also worth doing.

 Improving controlled shutdown for rolling updates
 -

 Key: KAFKA-2029
 URL: https://issues.apache.org/jira/browse/KAFKA-2029
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede
Priority: Critical

 Controlled shutdown as implemented currently can cause numerous problems: 
 deadlocks, local and global datalos, partitions without leader and etc. In 
 some cases the only way to restore cluster is to stop it completelly using 
 kill -9 and start again.
 Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
 queue size makes things much worse (see discussion there).
 Note 2: The problems described here can occure in any setup, but they are 
 extremly painful in setup with large brokers (36 disks, 1000+ assigned 
 partitions per broker in our case).
 Note 3: These improvements are actually workarounds and it is worth to 
 consider global refactoring of the controller (make it single thread, or even 
 get rid of it in the favour of ZK leader elections for partitions).
 The problems and improvements are:
 # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
 shutdown requests and finally considers it as failed and procedes to unclean 
 shutdow, controller got stuck for a while (holding a lock waiting for free 
 space in controller-to-broker queue). After broker starts back it receives 
 followers request and erases highwatermarks (with a message that replica 
 does not exists - controller hadn't yet sent a request with replica 
 assignment), then controller starts replicas on the broker it deletes all 
 local data (due to missing highwatermarks). Furthermore, controller starts 
 processing pending shutdown request and stops replicas on the broker letting 
 it in a non-functional state. Solution to the problem might be to increase 
 time broker waits for controller reponse to shutdown request, but this 
 timeout is taken from controller.socket.timeout.ms which is global for all 
 broker-controller communication and setting it to 30 minutes is dangerous. 
 *Proposed solution: introduce dedicated config parameter for this timeout 
 with a high default*.
 # If a broker gets down during controlled shutdown and did not come back 
 controller got stuck in a deadlock (one thread owns the lock and tries to add 
 message to the dead broker's queue, send thread is a infinite loop trying to 
 retry message to the dead broker, and the broker failure handler is waiting 
 for a lock). There are numerous partitions without a leader and the only way 
 out is to kill -9 the controller. *Proposed solution: add timeout for adding 
 message to broker's queue*. ControllerChannelManager.sendRequest:
 {code}
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
 (RequestOrResponse) = Unit = null) {
 brokerLock synchronized {
   val stateInfoOpt = brokerStateInfo.get(brokerId)
   stateInfoOpt match {
 case Some(stateInfo) =
   // ODKL Patch: prevent infinite hang on trying to send message to a 
 dead broker.
   // TODO: Move timeout to config
   if (!stateInfo.messageQueue.offer((request, callback), 10, 
 TimeUnit.SECONDS)) {
 error(Timed out trying to send message to broker  + 
 brokerId.toString)
 // Do not throw, as it brings controller into completely 
 non-functional state
 // Controller to broker state change requests batch is not empty 
 while creating a new one
 //throw new IllegalStateException(Timed out trying to send 
 message to broker  + brokerId.toString)
   }
 case None =
   

Re: Review Request 32199: Patch for KAFKA-1858

2015-03-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32199/#review76896
---


This will definitely make the test more stable. I am just thinking does it make 
sense to add a non deamon thread check in each test? So we can make sure there 
is no deamon thread left from some tests.

- Jiangjie Qin


On March 18, 2015, 11:12 a.m., Adrian Preston wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32199/
 ---
 
 (Updated March 18, 2015, 11:12 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1858
 https://issues.apache.org/jira/browse/KAFKA-1858
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1858. Better checking for threads being cleaned up
 
 
 Diffs
 -
 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 b46daa436231d5aa5c1e2992fd5c2d9a73a30c80 
 
 Diff: https://reviews.apache.org/r/32199/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Adrian Preston
 




Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Andrii Biletskyi
Yes,
I was talking about TopicMetadataRequest.scala (TMR), which is
MetadataRequest
in java definitions.

None of the mentioned above messages (StopReplica etc) are required in
KIP-4.
Sorry, if I misled someone.

Thanks,
Andrii Biletskyi

On Wed, Mar 18, 2015 at 5:42 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Sorry, Andrii, I'm a bit confused.

 The following request/response pairs are currently not implemented in
 the client:
 StopReplica, LeaderAndIsr, UpdateMetadata, ControlledShutdown.

 Does KIP-4 require any of these?

 Also, what's TMR?

 Gwen

 On Wed, Mar 18, 2015 at 4:07 AM, Andrii Biletskyi
 andrii.bilets...@stealth.ly wrote:
  Gwen,
 
  Thanks for bringing this up!
  Regarding UpdateMetadata in KIP-4 - no it shouldn't be used in Admin CLI,
  its internal server message. We will probably use TMR there (depends how
  generic re-routing facility goes) but TMR is already used in
 NetworkClient,
  so
  I believe there are no doubts about it, it should be ported to java.
 
  Thanks,
  Andrii Biletskyi
 
 
  On Wed, Mar 18, 2015 at 3:13 AM, Jiangjie Qin j...@linkedin.com.invalid
 
  wrote:
 
  I think those two requests are only used by controller to broker
  communication. Not sure if client side will need them in KIP-4,
 unlikely I
  guess.
 
  Jiangjie (Becket) Qin
 
  On 3/17/15, 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote:
 
  Hi,
  
  I'm starting this thread for the various questions I run into while
  refactoring the server to use client requests and responses.
  
  Help is appreciated :)
  
  First question: LEADER_AND_ISR request and STOP_REPLICA request are
  unimplemented in the client.
  
  Do we want to implement them as part of this refactoring?
  Or should we continue using the scala implementation for those?
  
  Gwen
 
 



[jira] [Commented] (KAFKA-1912) Create a simple request re-routing facility

2015-03-18 Thread Andrii Biletskyi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367331#comment-14367331
 ] 

Andrii Biletskyi commented on KAFKA-1912:
-

[~jkreps] thanks for adding the code.

A few questions / comments:
1. Regarding RequestRerouter shim: If we store and re-route unsent messages in 
the separate thread, outside the main io thread pool, wouldn't it blur 
num.io.threads setting? With this facility in place there will be a 
RequestHandlerPool bounded by num.io.threads and a message queue with a worker 
managing not-routed messages and unsent responses to clients.

2. On a message queue: if we bombard some broker with requests which require 
re-routing to controller wouldn't it be strange when client receives 
TimeoutException because _broker_ wasn't able to process that but not the 
controller who is the real request executor.
 
3. Is it possible for NetworkClient (on the client side) to know whether target 
broker's message queue is full and thus broker may delay re-routing? Consider 
there is a high priority admin task (like alter topic's config) but the target 
broker is busy with produce / fetch requests.

4. This was mentioned by [~guozhang] in the mailing list. Let me add this 
question too:
the broker instance which carries the current controller is agnostic of its 
existence,
and use KafkaApis to handle general Kafka requests. Having all admin
requests redirected to the controller instance will force the broker to be
aware of its carried controller, and access its internal data for handling
these requests. Plus with the number of clients out of Kafka's control,
this may easily cause the controller to be a hot spot in terms of request
load.

 Create a simple request re-routing facility
 ---

 Key: KAFKA-1912
 URL: https://issues.apache.org/jira/browse/KAFKA-1912
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps
 Fix For: 0.8.3


 We are accumulating a lot of requests that have to be directed to the correct 
 server. This makes sense for high volume produce or fetch requests. But it is 
 silly to put the extra burden on the client for the many miscellaneous 
 requests such as fetching or committing offsets and so on.
 This adds a ton of practical complexity to the clients with little or no 
 payoff in performance.
 We should add a generic request-type agnostic re-routing facility on the 
 server. This would allow any server to accept a request and forward it to the 
 correct destination, proxying the response back to the user. Naturally it 
 needs to do this without blocking the thread.
 The result is that a client implementation can choose to be optimally 
 efficient and manage a local cache of cluster state and attempt to always 
 direct its requests to the proper server OR it can choose simplicity and just 
 send things all to a single host and let that host figure out where to 
 forward it.
 I actually think we should implement this more or less across the board, but 
 some requests such as produce and fetch require more logic to proxy since 
 they have to be scattered out to multiple servers and gathered back to create 
 the response. So these could be done in a second phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-03-18 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367396#comment-14367396
 ] 

Jun Rao edited comment on KAFKA-1927 at 3/18/15 4:18 PM:
-

In order to work on this jira, we will need to merge KAFKA-1841 into trunk, 
which is waiting on KAFKA-1634.


was (Author: junrao):
In order to work on this jira, we will need to merge KAFKA-1841 into trunk, 
which is waiting on KAFA-1634.

 Replace requests in kafka.api with requests in 
 org.apache.kafka.common.requests
 ---

 Key: KAFKA-1927
 URL: https://issues.apache.org/jira/browse/KAFKA-1927
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps
Assignee: Gwen Shapira
 Fix For: 0.8.3


 The common package introduced a better way of defining requests using a new 
 protocol definition DSL and also includes wrapper objects for these.
 We should switch KafkaApis over to use these request definitions and consider 
 the scala classes deprecated (we probably need to retain some of them for a 
 while for the scala clients).
 This will be a big improvement because
 1. We will have each request now defined in only one place (Protocol.java)
 2. We will have built-in support for multi-version requests
 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation

2015-03-18 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367394#comment-14367394
 ] 

Jun Rao commented on KAFKA-1634:


[~guozhang], [~jjkoshy], in order to work on KAFKA-1927, we will need to have 
KAFKA-1841 merged into trunk, which depends on this jira.

 Improve semantics of timestamp in OffsetCommitRequests and update 
 documentation
 ---

 Key: KAFKA-1634
 URL: https://issues.apache.org/jira/browse/KAFKA-1634
 Project: Kafka
  Issue Type: Bug
Reporter: Neha Narkhede
Assignee: Guozhang Wang
Priority: Blocker
 Fix For: 0.8.3

 Attachments: KAFKA-1634.patch, KAFKA-1634_2014-11-06_15:35:46.patch, 
 KAFKA-1634_2014-11-07_16:54:33.patch, KAFKA-1634_2014-11-17_17:42:44.patch, 
 KAFKA-1634_2014-11-21_14:00:34.patch, KAFKA-1634_2014-12-01_11:44:35.patch, 
 KAFKA-1634_2014-12-01_18:03:12.patch, KAFKA-1634_2015-01-14_15:50:15.patch, 
 KAFKA-1634_2015-01-21_16:43:01.patch, KAFKA-1634_2015-01-22_18:47:37.patch, 
 KAFKA-1634_2015-01-23_16:06:07.patch, KAFKA-1634_2015-02-06_11:01:08.patch


 From the mailing list -
 following up on this -- I think the online API docs for OffsetCommitRequest
 still incorrectly refer to client-side timestamps:
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest
 Wasn't that removed and now always handled server-side now?  Would one of
 the devs mind updating the API spec wiki?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31568: Patch for KAFKA-1989

2015-03-18 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76899
---



core/src/main/scala/kafka/server/DelayedOperation.scala
https://reviews.apache.org/r/31568/#comment124592

This function is no longer triggered in the new purgatory; we need to add 
it back since it is instantiated in produce / fetch for recording metrics, etc.



core/src/main/scala/kafka/server/DelayedOperation.scala
https://reviews.apache.org/r/31568/#comment124591

Try purging on each watcher list in each doWork will likely increase the 
CPU a lot, we can reduce the CPU usage by only purging when watched() is too 
large.


- Guozhang Wang


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31568/
 ---
 
 (Updated Feb. 28, 2015, 12:14 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1989
 https://issues.apache.org/jira/browse/KAFKA-1989
 
 
 Repository: kafka
 
 
 Description
 ---
 
 new purgatory implementation
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/DelayedOperation.scala 
 e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
 7a37617395b9e4226853913b8989f42e7301de7c 
   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31568/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Yasuhiro Matsuda
 




RE: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-18 Thread Aditya Auradkar
Is it possible these 3 options during the next KIP hangout? 

Aditya


From: Steven Wu [stevenz...@gmail.com]
Sent: Tuesday, March 17, 2015 10:08 AM
To: dev@kafka.apache.org
Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

Jay,

let's say an app produces to 10 different topics. one of the topic is sent
from a library. due to whatever condition/bug, this lib starts to send
messages over the quota. if we go with the delayed response approach, it
will cause the whole shared RecordAccumulator buffer to be filled up. that
will penalize other 9 topics who are within the quota. that is the
unfairness point that Ewen and I were trying to make.

if broker just drop the msg and return an error/status code indicates the
drop and why. then producer can just move on and accept the drop. shared
buffer won't be saturated and other 9 topics won't be penalized.

Thanks,
Steven



On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Steven,

 It is true that hitting the quota will cause back-pressure on the producer.
 But the solution is simple, a producer that wants to avoid this should stay
 under its quota. In other words this is a contract between the cluster and
 the client, with each side having something to uphold. Quite possibly the
 same thing will happen in the absence of a quota, a client that produces an
 unexpected amount of load will hit the limits of the server and experience
 backpressure. Quotas just allow you to set that same limit at something
 lower than 100% of all resources on the server, which is useful for a
 shared cluster.

 -Jay

 On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote:

  wait. we create one kafka producer for each cluster. each cluster can
 have
  many topics. if producer buffer got filled up due to delayed response for
  one throttled topic, won't that penalize other topics unfairly? it seems
 to
  me that broker should just return error without delay.
 
  sorry that I am chatting to myself :)
 
  On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com
 wrote:
 
   I think I can answer my own question. delayed response will cause the
   producer buffer to be full, which then result in either thread blocking
  or
   message drop.
  
   On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com
  wrote:
  
   please correct me if I am missing sth here. I am not understanding how
   would throttle work without cooperation/back-off from producer. new
 Java
   producer supports non-blocking API. why would delayed response be able
  to
   slow down producer? producer will continue to fire async sends.
  
   On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com
   wrote:
  
   I think we are really discussing two separate issues here:
  
   1. Whether we should a) append-then-block-then-returnOKButThrottled
 or
  b)
   block-then-returnFailDuetoThrottled for quota actions on produce
   requests.
  
   Both these approaches assume some kind of well-behaveness of the
  clients:
   option a) assumes the client sets an proper timeout value while can
  just
   ignore OKButThrottled response, while option b) assumes the client
   handles the FailDuetoThrottled appropriately. For any malicious
  clients
   that, for example, just keep retrying either intentionally or not,
   neither
   of these approaches are actually effective.
  
   2. For OKButThrottled and FailDuetoThrottled responses, shall we
   encode
   them as error codes or augment the protocol to use a separate field
   indicating status codes.
  
   Today we have already incorporated some status code as error codes in
  the
   responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of
  this
   is of course using a single field for response status like the HTTP
   status
   codes, while the cons is that it requires clients to handle the error
   codes
   carefully.
  
   I think maybe we can actually extend the single-code approach to
  overcome
   its drawbacks, that is, wrap the error codes semantics to the users
 so
   that
   users do not need to handle the codes one-by-one. More concretely,
   following Jay's example the client could write sth. like this:
  
  
   -
  
 if(error.isOK())
// status code is good or the code can be simply ignored for
 this
   request type, process the request
 else if(error.needsRetry())
// throttled, transient error, etc: retry
 else if(error.isFatal())
// non-retriable errors, etc: notify / terminate / other
 handling
  
   -
  
   Only when the clients really want to handle, for example
   FailDuetoThrottled
   status code specifically, it needs to:
  
 if(error.isOK())
// status code is good or the code can be simply ignored for
 this
   request type, process the request
 else if(error == FailDuetoThrottled )
// throttled: log it
 else if(error.needsRetry())
// transient error, 

[jira] [Commented] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-03-18 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367396#comment-14367396
 ] 

Jun Rao commented on KAFKA-1927:


In order to work on this jira, we will need to merge KAFKA-1841 into trunk, 
which is waiting on KAFA-1634.

 Replace requests in kafka.api with requests in 
 org.apache.kafka.common.requests
 ---

 Key: KAFKA-1927
 URL: https://issues.apache.org/jira/browse/KAFKA-1927
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps
Assignee: Gwen Shapira
 Fix For: 0.8.3


 The common package introduced a better way of defining requests using a new 
 protocol definition DSL and also includes wrapper objects for these.
 We should switch KafkaApis over to use these request definitions and consider 
 the scala classes deprecated (we probably need to retain some of them for a 
 while for the scala clients).
 This will be a big improvement because
 1. We will have each request now defined in only one place (Protocol.java)
 2. We will have built-in support for multi-version requests
 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-18 Thread Jun Rao
Andri,

Thanks for the summary.

1. I just realized that in order to start working on KAFKA-1927, we will
need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk.
This is planned to be done as part of KAFKA-1634. So, we will need Guozhang
and Joel's help to wrap this up.

2. Thinking about this a bit more, if the semantic of those write
requests is async (i.e., after the client gets a response, it just means
that the operation is initiated, but not necessarily completed), we don't
really need to forward the requests to the controller. Instead, the
receiving broker can just write the operation to ZK as the admin command
line tool previously does. This will simplify the implementation.

8. There is another implementation detail for describe topic. Ideally, we
want to read the topic config from the broker cache, instead of ZooKeeper.
Currently, every broker reads the topic-level config for all topics.
However, it ignores those for topics not hosted on itself. So, we may need
to change TopicConfigManager a bit so that it caches the configs for all
topics.

Thanks,

Jun


On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi 
andrii.bilets...@stealth.ly wrote:

 Guys,

 Thanks for a great discussion!
 Here are the actions points:

 1. Q: Get rid of all scala requests objects, use java protocol definitions.
 A: Gwen kindly took that (KAFKA-1927). It's important to speed up
 review procedure
  there since this ticket blocks other important changes.

 2. Q: Generic re-reroute facility vs client maintaining cluster state.
 A: Jay has added pseudo code to KAFKA-1912 - need to consider whether
 this will be
 easy to implement as a server-side feature (comments are
 welcomed!).

 3. Q: Controller field in wire protocol.
 A: This might be useful for clients, add this to TopicMetadataResponse
 (already in KIP).

 4. Q: Decoupling topic creation from TMR.
 A: I will add proposed by Jun solution (using clientId for that) to the
 KIP.

 5. Q: Bumping new versions of TMR vs grabbing all protocol changes in one
 version.
 A: It was decided to try to gather all changes to protocol (before
 release).
 In case of TMR it worth checking: KAFKA-2020 and KIP-13 (quotas)

 6. Q: JSON lib is needed to deserialize user's input in CLI tool.
 A: Use jackson for that, /tools project is a separate jar so shouldn't
 be a big deal.

 7.  Q: VerifyReassingPartitions vs generic status check command.
  A: For long-running requests like reassign partitions *progress* check
 request is useful,
  it makes sense to introduce it.

  Please add, correct me if I missed something.

 Thanks,
 Andrii Biletskyi

 On Tue, Mar 17, 2015 at 6:20 PM, Andrii Biletskyi 
 andrii.bilets...@stealth.ly wrote:

  Joel,
 
  You are right, I removed ClusterMetadata because we have partially
  what we need in TopicMetadata. Also, as Jay pointed out earlier, we
  would like to have orthogonal API, but at the same time we need
  to be backward compatible.
 
  But I like your idea and even have some other arguments for this option:
  There is also DescribeTopicRequest which was proposed in this KIP,
  it returns topic configs, partitions, replication factor plus partition
  ISR, ASR,
  leader replica. The later part is really already there in
  TopicMetadataRequest.
  So again we'll have to add stuff to TMR, not to duplicate some info in
  newly added requests. However, this way we'll end up with monster
  request which returns cluster metadata, topic replication and config info
  plus partition replication data. Seems logical to split TMR to
  - ClusterMetadata (brokers + controller, maybe smth else)
  - TopicMetadata (topic info + partition details)
  But since current TMR is involved in lots of places (including network
  client,
  as I understand) this might be very serious change and it probably makes
  sense to stick with current approach.
 
  Thanks,
  Andrii Biletskyi
 
 
  On Tue, Mar 17, 2015 at 5:29 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
  I may be missing some context but hopefully this will also be covered
  today: I thought the earlier proposal where there was an explicit
  ClusterMetadata request was clearer and explicit. During the course of
  this thread I think the conclusion was that the main need was for
  controller information and that can be rolled into the topic metadata
  response but that seems a bit irrelevant to topic metadata. FWIW I
  think the full broker-list is also irrelevant to topic metadata, but
  it is already there and in use. I think there is still room for an
  explicit ClusterMetadata request since there may be other
  cluster-level information that we may want to add over time (and that
  have nothing to do with topic metadata).
 
  On Tue, Mar 17, 2015 at 02:45:30PM +0200, Andrii Biletskyi wrote:
   Jun,
  
   101. Okay, if you say that such use case is important. I also think
   using clientId for these purposes is fine - if we already have this
  

RE: [DISCUSS] KIP-5 - Broker Configuration Management

2015-03-18 Thread Aditya Auradkar
Hi Andrii,

Thanks for the writeup. 
IMO, we should be able to support dynamically changing certain configs. For 
example: the Quota proposal relies on such a mechanism for changing quotas on 
the fly. 

Aditya

From: Andrii Biletskyi [andrii.bilets...@stealth.ly]
Sent: Tuesday, March 03, 2015 10:30 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-5 - Broker Configuration Management

Hey Jun,

Initially I was thinking about instead of 3-4 state that global config
changes
take effect only after broker restart. So it's just:

3-4. On each broker startup apply global config from ZK

In other words, the comprehensive workflow is the following:
1. Issue ChangeGlobalConfigRequest
2. Controller validates / stores config in ZK
(out of scope of this KIP) 3. Do rolling restart for brokers one by one to
pick up
config changes

My understanding is that we won't be able to handle config change
dynamically
as we do for Log config. The main reason, from my point of view, is that
broker config operates such settings as num.io.threads updating which would
mean
gracefully restart some of the broker's components (in this case
SocketServer) which is,
in turn, might be very tricky.

Thanks,
Andrii Biletskyi

On Tue, Mar 3, 2015 at 7:43 PM, Jun Rao j...@confluent.io wrote:

 It seems the proposed workflow is the following.

 1. Client issues a global config update request to the broker.
 2. Broker writes the new config to ZK.
 3. Controller picks up the changes from ZK.
 4. Controller propagates the config changes to all brokers.

 Do we need to add a new request/response to propagate the config changes?

 Also, this logic is a bit different from how per topic config changes
 works: each broker reads from ZK directly. It would be good to make them
 consistent.

 Thanks,

 Jun


 On Wed, Jan 21, 2015 at 10:34 PM, Joe Stein joe.st...@stealth.ly wrote:

  Created a KIP
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-5+-+Broker+Configuration+Management
 
  JIRA https://issues.apache.org/jira/browse/KAFKA-1786
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 



[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector

2015-03-18 Thread Ashwin Jayaprakash (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367361#comment-14367361
 ] 

Ashwin Jayaprakash commented on KAFKA-1716:
---

Sorry for the delay, I will get back to you on this ASAP.

 hang during shutdown of ZookeeperConsumerConnector
 --

 Key: KAFKA-1716
 URL: https://issues.apache.org/jira/browse/KAFKA-1716
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Sean Fay
Assignee: Neha Narkhede
 Attachments: after-shutdown.log, before-shutdown.log, 
 kafka-shutdown-stuck.log


 It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to 
 wedge in the case that some consumer fetcher threads receive messages during 
 the shutdown process.
 Shutdown thread:
 {code}-- Parking to wait for: 
 java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
 at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
 at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
 at 
 kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
 at 
 kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
 at 
 scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at 
 scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
 at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
 at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
 at 
 scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at 
 kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
 ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
 at 
 kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
 at 
 kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
 at 
 kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
 ConsumerFetcherThread:
 {code}-- Parking to wait for: 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
 at jrockit/vm/Locks.park0(J)V(Native Method)
 at jrockit/vm/Locks.park(Locks.java:2230)
 at sun/misc/Unsafe.park(ZJ)V(Native Method)
 at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
 at 
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
 at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
 at 
 kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
 at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
 at 
 scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at 
 kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
 at kafka/utils/Utils$.inLock(Utils.scala:538)
 at 
 kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
 at 
 

Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Gwen Shapira
Sorry, Andrii, I'm a bit confused.

The following request/response pairs are currently not implemented in
the client:
StopReplica, LeaderAndIsr, UpdateMetadata, ControlledShutdown.

Does KIP-4 require any of these?

Also, what's TMR?

Gwen

On Wed, Mar 18, 2015 at 4:07 AM, Andrii Biletskyi
andrii.bilets...@stealth.ly wrote:
 Gwen,

 Thanks for bringing this up!
 Regarding UpdateMetadata in KIP-4 - no it shouldn't be used in Admin CLI,
 its internal server message. We will probably use TMR there (depends how
 generic re-routing facility goes) but TMR is already used in NetworkClient,
 so
 I believe there are no doubts about it, it should be ported to java.

 Thanks,
 Andrii Biletskyi


 On Wed, Mar 18, 2015 at 3:13 AM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

 I think those two requests are only used by controller to broker
 communication. Not sure if client side will need them in KIP-4, unlikely I
 guess.

 Jiangjie (Becket) Qin

 On 3/17/15, 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi,
 
 I'm starting this thread for the various questions I run into while
 refactoring the server to use client requests and responses.
 
 Help is appreciated :)
 
 First question: LEADER_AND_ISR request and STOP_REPLICA request are
 unimplemented in the client.
 
 Do we want to implement them as part of this refactoring?
 Or should we continue using the scala implementation for those?
 
 Gwen




Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Jun Rao
Hi, Gwen,

I was thinking that we will be doing the following in KAFKA-1927.

1. Get the bytes from network.
2. Use a new generic approach to convert bytes into request objects.
2.1 Read the fixed request header (using the util in client).
2.2 Based on the request id in the header, deserialize the rest of the
bytes into a request specific object (using the new java objects).
3. We will then be passing a header and an AbstractRequestResponse to
KafkaApis.

In order to do that, we will need to create similar request/response
objects for internal requests such as StopReplica, LeaderAndIsr,
UpdateMetadata, ControlledShutdown. Not sure whether they should be written
in java or scala, but perhaps they should be only in the core project.

Also note, there are some scala requests/responses used directly in
SimpleConsumer. Since that's our public api, we can't remove those scala
objects until the old consumer is phased out. We can remove the rest of the
scala request objects.

Thanks,

Jun


On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi,

 I'm starting this thread for the various questions I run into while
 refactoring the server to use client requests and responses.

 Help is appreciated :)

 First question: LEADER_AND_ISR request and STOP_REPLICA request are
 unimplemented in the client.

 Do we want to implement them as part of this refactoring?
 Or should we continue using the scala implementation for those?

 Gwen



Re: Review Request 31568: Patch for KAFKA-1989

2015-03-18 Thread Yasuhiro Matsuda


 On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/server/DelayedOperation.scala, line 116
  https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line116
 
  We need to make tickMs and wheelSize configurable.
 
 Yasuhiro Matsuda wrote:
 What is the motivation? I don't think it is a good idea to allow users to 
 configure them.
 
 Guozhang Wang wrote:
 I am not concerning about user-configurability. The purgatory is used by 
 multiple request types: produce, fetch and in the future rebalance, heartbeat 
 and join group, different request type may need to set the tickMs and 
 wheelSize differently.

It is easy to add parameters to DelayedOperationPurgatory since Timer already 
has them. But I don't see any compelling reason to do it now. Hierarchical 
Timing wheel is very robust to varying timeout requests by design.


 On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/server/DelayedOperation.scala, lines 187-192
  https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line187
 
  TBD
 
 Guozhang Wang wrote:
 Replace the TBD here: we can let Timer.addTimerTaskEntry and Timer.add 
 return Boolean instead of Unit indicating if the task has not expired and 
 successfully added to the timer. And then we can change above as
 
 if (!operation.isComplete()) {
   if (!timeoutTimer.add(operation) {
 operation.cancel()
   }
 }

An expired task is always successfuly added to the timer and executed 
immediately. Do you mean completed instead of expired? TimeTask do not have 
notion of completion. I kept the completion concept out of the Timer 
implementation since it is not essential to Timer functionality.


 On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/utils/timer/Timer.scala, line 68
  https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68
 
  I think bucket.flush(reinsurt) will always fail on all the items since 
  their expiration time will always  bucket expiration + ticketMs, i.e. the 
  returned bucket from the delayed queue has already expired all its items. 
  In this case, could we just call foreach(submit) on all of them instead of 
  trying to reinsurt them?
 
 Yasuhiro Matsuda wrote:
 It is true only for the lowest wheel. Reinsert is necessary to make 
 timing wheels work. A bucket from a higher wheel may contain tasks not 
 expired (a tick time is longer in a higher wheel).
 
 Guozhang Wang wrote:
 OK, I may miss sth. here, but this is my reasoning:
 
 The bucket is only returned from delayed queue in line 62 if its 
 expiration time has passed currentTime, after that at least the lowest wheel 
 will advance to its expiration time, and hence add call within the reinsert 
 is doomed to fail as task.expirationTime  wheel's time + tickMs.

If the expired bucket is from the lowest wheel, all tasks in the bucket is 
expired. reinsert submits the task to a thread pool for execution.
If the expired bucket is from a higher wheel, tasks are either expired or not 
expired. reinsert submits the expired tasks to a thread pool and move 
unexpired tasks to lower wheels.


 On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72
  https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72
 
  It seems the task entry of the task will only be set once throughout 
  its life time; even when the task entry gets reinsurted its correspondence 
  to the task will not change, right?
  
  If that is true we can just set the entry for the task in the 
  constructor of the task entry.
 
 Yasuhiro Matsuda wrote:
 This sets TimerTaskEntry to TimerTask. TimeTask is created independently 
 from a Timer, then enqueued to a Timer.
 
 Guozhang Wang wrote:
 Yes, but can we move this line to line 119 of TimerTaskList.scala? Then 
 in line 46 of Timer when we create the TimerTaskEntry with the passed in 
 TimerTask its entry field will be set automatically.

If a task already enqueued to a timer is enqueued again intentionally or 
unintentionally (=bug), what happens?
My intention here is to keep data structure consistent in such a case. 
setTimerTaskEntry removes the old entry if exists.


- Yasuhiro


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
---


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31568/
 ---
 
 (Updated Feb. 28, 2015, 12:14 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1989
 https://issues.apache.org/jira/browse/KAFKA-1989
 
 
 

Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Gwen Shapira
Hi Jun,

I was taking a slightly different approach. Let me know if it makes
sense to you:

1. Get the bytes from network (kinda unavoidable...)
2. Modify RequestChannel.Request to contain header and body (instead
of a single object)
3. Create the head and body from bytes as follow:
val header: RequestHeader = RequestHeader.parse(buffer)
val apiKey: Int = header.apiKey
val body: Struct =
ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
4. KafkaAPIs will continue getting RequestChannel.Request, but will
now have access to body and header separately.

I agree that I need a Request/Response objects that contain only the
body for all requests objects.
I'm thinking of implementing them in o.a.k.Common.Requests in Java for
consistency.

When we are discussing the requests/responses used in SimpleConsumer,
we mean everything used in javaapi, right?

Gwen



On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote:
 Hi, Gwen,

 I was thinking that we will be doing the following in KAFKA-1927.

 1. Get the bytes from network.
 2. Use a new generic approach to convert bytes into request objects.
 2.1 Read the fixed request header (using the util in client).
 2.2 Based on the request id in the header, deserialize the rest of the
 bytes into a request specific object (using the new java objects).
 3. We will then be passing a header and an AbstractRequestResponse to
 KafkaApis.

 In order to do that, we will need to create similar request/response
 objects for internal requests such as StopReplica, LeaderAndIsr,
 UpdateMetadata, ControlledShutdown. Not sure whether they should be written
 in java or scala, but perhaps they should be only in the core project.

 Also note, there are some scala requests/responses used directly in
 SimpleConsumer. Since that's our public api, we can't remove those scala
 objects until the old consumer is phased out. We can remove the rest of the
 scala request objects.

 Thanks,

 Jun


 On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi,

 I'm starting this thread for the various questions I run into while
 refactoring the server to use client requests and responses.

 Help is appreciated :)

 First question: LEADER_AND_ISR request and STOP_REPLICA request are
 unimplemented in the client.

 Do we want to implement them as part of this refactoring?
 Or should we continue using the scala implementation for those?

 Gwen



Re: Review Request 32193: Patch for KAFKA-1997

2015-03-18 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32193/#review76910
---



core/src/main/scala/kafka/tools/MirrorMaker.scala
https://reviews.apache.org/r/32193/#comment124623

Could we use ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION ... etc 
to replace the hard written string?


- Guozhang Wang


On March 18, 2015, 3:40 a.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32193/
 ---
 
 (Updated March 18, 2015, 3:40 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1997
 https://issues.apache.org/jira/browse/KAFKA-1997
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Follow-up patch for KAFKA-1997, fix a few bugs.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 11acc3103e4e4a30d7380e26355ccba09b3304bb 
 
 Diff: https://reviews.apache.org/r/32193/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-18 Thread Gwen Shapira
On Wed, Mar 18, 2015 at 9:34 AM, Jun Rao j...@confluent.io wrote:
 Andri,

 Thanks for the summary.

 1. I just realized that in order to start working on KAFKA-1927, we will
 need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk.
 This is planned to be done as part of KAFKA-1634. So, we will need Guozhang
 and Joel's help to wrap this up.

I mentioned this in a separate thread, but it may be more relevant here:
It looks like the SimpleConsumer API exposes TopicMetadataRequest and
TopicMetadataResponse.
This means that KAFKA-1927 doesn't remove this duplication.

So I'm not sure we actually need KAFKA-1927 before implementing this KIP.
This doesn't mean I'm stopping work on KAFKA-1927, but perhaps it
means we can proceed in parallel?

 2. Thinking about this a bit more, if the semantic of those write
 requests is async (i.e., after the client gets a response, it just means
 that the operation is initiated, but not necessarily completed), we don't
 really need to forward the requests to the controller. Instead, the
 receiving broker can just write the operation to ZK as the admin command
 line tool previously does. This will simplify the implementation.

 8. There is another implementation detail for describe topic. Ideally, we
 want to read the topic config from the broker cache, instead of ZooKeeper.
 Currently, every broker reads the topic-level config for all topics.
 However, it ignores those for topics not hosted on itself. So, we may need
 to change TopicConfigManager a bit so that it caches the configs for all
 topics.

 Thanks,

 Jun


 On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi 
 andrii.bilets...@stealth.ly wrote:

 Guys,

 Thanks for a great discussion!
 Here are the actions points:

 1. Q: Get rid of all scala requests objects, use java protocol definitions.
 A: Gwen kindly took that (KAFKA-1927). It's important to speed up
 review procedure
  there since this ticket blocks other important changes.

 2. Q: Generic re-reroute facility vs client maintaining cluster state.
 A: Jay has added pseudo code to KAFKA-1912 - need to consider whether
 this will be
 easy to implement as a server-side feature (comments are
 welcomed!).

 3. Q: Controller field in wire protocol.
 A: This might be useful for clients, add this to TopicMetadataResponse
 (already in KIP).

 4. Q: Decoupling topic creation from TMR.
 A: I will add proposed by Jun solution (using clientId for that) to the
 KIP.

 5. Q: Bumping new versions of TMR vs grabbing all protocol changes in one
 version.
 A: It was decided to try to gather all changes to protocol (before
 release).
 In case of TMR it worth checking: KAFKA-2020 and KIP-13 (quotas)

 6. Q: JSON lib is needed to deserialize user's input in CLI tool.
 A: Use jackson for that, /tools project is a separate jar so shouldn't
 be a big deal.

 7.  Q: VerifyReassingPartitions vs generic status check command.
  A: For long-running requests like reassign partitions *progress* check
 request is useful,
  it makes sense to introduce it.

  Please add, correct me if I missed something.

 Thanks,
 Andrii Biletskyi

 On Tue, Mar 17, 2015 at 6:20 PM, Andrii Biletskyi 
 andrii.bilets...@stealth.ly wrote:

  Joel,
 
  You are right, I removed ClusterMetadata because we have partially
  what we need in TopicMetadata. Also, as Jay pointed out earlier, we
  would like to have orthogonal API, but at the same time we need
  to be backward compatible.
 
  But I like your idea and even have some other arguments for this option:
  There is also DescribeTopicRequest which was proposed in this KIP,
  it returns topic configs, partitions, replication factor plus partition
  ISR, ASR,
  leader replica. The later part is really already there in
  TopicMetadataRequest.
  So again we'll have to add stuff to TMR, not to duplicate some info in
  newly added requests. However, this way we'll end up with monster
  request which returns cluster metadata, topic replication and config info
  plus partition replication data. Seems logical to split TMR to
  - ClusterMetadata (brokers + controller, maybe smth else)
  - TopicMetadata (topic info + partition details)
  But since current TMR is involved in lots of places (including network
  client,
  as I understand) this might be very serious change and it probably makes
  sense to stick with current approach.
 
  Thanks,
  Andrii Biletskyi
 
 
  On Tue, Mar 17, 2015 at 5:29 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
  I may be missing some context but hopefully this will also be covered
  today: I thought the earlier proposal where there was an explicit
  ClusterMetadata request was clearer and explicit. During the course of
  this thread I think the conclusion was that the main need was for
  controller information and that can be rolled into the topic metadata
  response but that seems a bit irrelevant to topic metadata. FWIW I
  think the full broker-list is also irrelevant to 

[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names

2015-03-18 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367878#comment-14367878
 ] 

Guozhang Wang commented on KAFKA-1481:
--

We hit an issue related to this ticket, which adds the brokerHost / 
brokerPort into FetchRequestAndResponseMetrics. The root cause is that when 
server starts up, it gets local host string by calling:

{code}
InetAddress.getLocalHost.getCanonicalHostName
{code}

which, will possibly just return the textual representation of the IP address 
if somehow accessing local hostname is not allowed:

http://docs.oracle.com/javase/7/docs/api/java/net/InetAddress.html#getCanonicalHostName%28%29

In our case, the IPV6 address string is returned, which is registered in ZK, 
read by controller and propogated to brokers through metadata update, and 
eventually read by consumers. And when that happens, we got the following error:

{code}
2015-03-18 09:46:30 JmxReporter [WARN] Error processing 
kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=samza_checkpoint_manager-wikipedia_parser-1-1426697189628-0,brokerHost=fe80:0:0:0:7ed1:c3ff:fee0:c60f%4,brokerPort=9092
javax.management.MalformedObjectNameException: Invalid character ':' in value 
part of property
at javax.management.ObjectName.construct(ObjectName.java:618)
at javax.management.ObjectName.init(ObjectName.java:1382)
at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395)
at 
com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516)
at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491)
at com.yammer.metrics.core.MetricsRegistry.newTimer(MetricsRegistry.java:320)
at kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:85)
at 
kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:26)
at 
kafka.consumer.FetchRequestAndResponseMetrics.init(FetchRequestAndResponseStats.scala:35)
at 
kafka.consumer.FetchRequestAndResponseStats$$anonfun$1.apply(FetchRequestAndResponseStats.scala:44)
at 
kafka.consumer.FetchRequestAndResponseStats$$anonfun$1.apply(FetchRequestAndResponseStats.scala:44)
at kafka.utils.Pool.getAndMaybePut(Pool.scala:61)
at 
kafka.consumer.FetchRequestAndResponseStats.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:51)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108)
at 
org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:283)
at 
org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:256)
at 
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82)
at 
org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readLog(KafkaCheckpointManager.scala:255)
at 
org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readChangeLogPartitionMapping(KafkaCheckpointManager.scala:242)
at 
org.apache.samza.coordinator.JobCoordinator$.buildJobModel(JobCoordinator.scala:136)
at 
org.apache.samza.coordinator.JobCoordinator.buildJobModel(JobCoordinator.scala)
at 
org.apache.samza.job.standalone.controller.StandaloneZkCoordinatorController.refreshOwnership(StandaloneZkCoordinatorController.java:161)
at 
org.apache.samza.job.standalone.controller.StandaloneZkCoordinatorController.access$900(StandaloneZkCoordinatorController.java:49)
at 
org.apache.samza.job.standalone.controller.StandaloneZkCoordinatorController$ContainerPathListener.handleChildChange(StandaloneZkCoordinatorController.java:256)
at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
{code}

I think the right solution here is that BrokerHost string should also be 
canonized before adding to sensor tags. [~vladimir.tretyakov] [~junrao] what do 
you think?

 Stop using dashes AND underscores as separators in MBean names
 --

 Key: KAFKA-1481
 URL: https://issues.apache.org/jira/browse/KAFKA-1481
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.1.1
Reporter: Otis Gospodnetic
Assignee: Vladimir Tretyakov
Priority: Critical
  Labels: patch
 Fix For: 0.8.2.0, 0.8.3

 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, 
 KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, 
 KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, 
 KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_2014-10-30_21-35-43.patch, 
 KAFKA-1481_2014-10-31_14-35-43.patch, 
 KAFKA-1481_2014-11-03_16-39-41_doc.patch, 
 KAFKA-1481_2014-11-03_17-02-23.patch, 
 KAFKA-1481_2014-11-10_20-39-41_doc.patch, 
 KAFKA-1481_2014-11-10_21-02-23.patch, 

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-18 Thread Andrii Biletskyi
Joel,

I'm totally behind your arguments concerning adding irrelevant stuff to
TopicMetadataRequest. And also about having a bloated request.

Personally I'd go with a separate ClusterMetadataRequest (CMR), actually
this was our initial proposal. But since the second part of the request -
brokers
is already present in TopicMetadataResponse (TMR) I agreed to augment
TMR instead of introducing a separate request.

The only thing which should be considered though is kafka producer /
consumer.
If we split TMR to topic metadata and cluster metadata (brokers +
controller) we
need to think whether it's okay if clients would have to issue two separate
requests to maintain Metadata.java (in terms of potential concurrency
issue).
Can someone please clarify this question?

Thanks,
Andrii Biletskyi


On Wed, Mar 18, 2015 at 8:58 PM, Joel Koshy jjkosh...@gmail.com wrote:

 (Thanks Andrii for the summary)

 For (1) yes we will circle back on that shortly after syncing up in
 person. I think it is close to getting committed although development
 for KAFKA-1927 can probably begin without it.

 There is one more item we covered at the hangout. i.e., whether we
 want to add the coordinator to the topic metadata response or provide
 a clearer ClusterMetadataRequest.

 There are two reasons I think we should try and avoid adding the
 field:
 - It is irrelevant to topic metadata
 - If we finally do request rerouting in Kafka then the field would add
   little to no value. (It still helps to have a separate
   ClusterMetadataRequest to query for cluster-wide information such as
   'which broker is the controller?' as Joe mentioned.)

 I think it would be cleaner to have an explicit ClusterMetadataRequest
 that you can send to any broker in order to obtain the controller (and
 in the future possibly other cluster-wide information). I think the
 main argument against doing this and instead adding it to the topic
 metadata response was convenience - i.e., you don't have to discover
 the controller in advance. However, I don't see much actual
 benefit/convenience in this and in fact think it is a non-issue. Let
 me know if I'm overlooking something here.

 As an example, say we need to initiate partition reassignment by
 issuing the new ReassignPartitionsRequest to the controller (assume we
 already have the desired manual partition assignment).  If we are to
 augment topic metadata response then the flow be something like this :

 - Issue topic metadata request to any broker (and discover the
   controller
 - Connect to controller if required (i.e., if the broker above !=
   controller)
 - Issue the partition reassignment request to the controller.

 With an explicit cluster metadata request it would be:
 - Issue cluster metadata request to any broker
 - Connect to controller if required (i.e., if the broker above !=
   controller)
 - Issue the partition reassignment request

 So it seems to add little practical value and bloats topic metadata
 response with an irrelevant detail.

 The other angle to this is the following - is it a matter of naming?
 Should we just rename topic metadata request/response to just
 MetadataRequest/Response and add cluster metadata to it? By that same
 token should we also allow querying for the consumer coordinator (and
 in future transaction coordinator) as well? This leads to a bloated
 request which isn't very appealing and altogether confusing.

 Thanks,

 Joel

 On Wed, Mar 18, 2015 at 09:34:12AM -0700, Jun Rao wrote:
  Andri,
 
  Thanks for the summary.
 
  1. I just realized that in order to start working on KAFKA-1927, we will
  need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk.
  This is planned to be done as part of KAFKA-1634. So, we will need
 Guozhang
  and Joel's help to wrap this up.
 
  2. Thinking about this a bit more, if the semantic of those write
  requests is async (i.e., after the client gets a response, it just means
  that the operation is initiated, but not necessarily completed), we don't
  really need to forward the requests to the controller. Instead, the
  receiving broker can just write the operation to ZK as the admin command
  line tool previously does. This will simplify the implementation.
 
  8. There is another implementation detail for describe topic. Ideally, we
  want to read the topic config from the broker cache, instead of
 ZooKeeper.
  Currently, every broker reads the topic-level config for all topics.
  However, it ignores those for topics not hosted on itself. So, we may
 need
  to change TopicConfigManager a bit so that it caches the configs for all
  topics.
 
  Thanks,
 
  Jun
 
 
  On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi 
  andrii.bilets...@stealth.ly wrote:
 
   Guys,
  
   Thanks for a great discussion!
   Here are the actions points:
  
   1. Q: Get rid of all scala requests objects, use java protocol
 definitions.
   A: Gwen kindly took that (KAFKA-1927). It's important to speed up
   review procedure
  

[jira] [Created] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-03-18 Thread Dmitry Bugaychenko (JIRA)
Dmitry Bugaychenko created KAFKA-2029:
-

 Summary: Improving controlled shutdown for rolling updates
 Key: KAFKA-2029
 URL: https://issues.apache.org/jira/browse/KAFKA-2029
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede
Priority: Critical


Controlled shutdown as implemented currently can cause numerous problems: 
deadlocks, local and global datalos, partitions without leader and etc. In some 
cases the only way to restore cluster is to stop it completelly using kill -9 
and start again.

Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue 
size makes things much worse (see discussion there).

Note 2: The problems described here can occure in any setup, but they are 
extremly painful in setup with large brokers (36 disks, 1000+ assigned 
partitions per broker in our case).

The problems are:

# Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
shutdown requests and finally considers it as failed and procedes to unclean 
shutdow, controller got stuck for a while (holding a lock waiting for free 
space in controller-to-broker queue). After broker starts back it receives 
followers request and erases highwatermarks (with a message that replica does 
not exists - controller hadn't yet sent a request with replica assignment), 
then controller starts replicas on the broker it deletes all local data (due to 
missing highwatermarks). Furthermore, controller starts processing pending 
shutdown request and stops replicas on the broker letting it in a 
non-functional state. Solution to the problem might be to increase time broker 
waits for controller reponse to shutdown request, but this timeout is taken 
from controller.socket.timeout.ms which is global for all broker-controller 
communication and setting it to 30 minutes is dangerous. *Proposed solution: 
introduce dedicated config parameter for this timeout with a high default*.
# If a broker gets down during controlled shutdown and did not come back 
controller got stuck in a deadlock (one thread owns the lock and tries to add 
message to the dead broker's queue, send thread is a infinite loop trying to 
retry message to the dead broker, and the broker failure handler is waiting for 
a lock). There are numerous partitions without a leader and the only way out is 
to kill -9 the controller. *Proposed solution: add timeout for adding message 
to broker's queue*. ControllerChannelManager.sendRequest:
{code}
  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
(RequestOrResponse) = Unit = null) {
brokerLock synchronized {
  val stateInfoOpt = brokerStateInfo.get(brokerId)
  stateInfoOpt match {
case Some(stateInfo) =
  // ODKL Patch: prevent infinite hang on trying to send message to a 
dead broker.
  // TODO: Move timeout to config
  if (!stateInfo.messageQueue.offer((request, callback), 10, 
TimeUnit.SECONDS)) {
error(Timed out trying to send message to broker  + 
brokerId.toString)
// Do not throw, as it brings controller into completely 
non-functional state
// Controller to broker state change requests batch is not empty 
while creating a new one
//throw new IllegalStateException(Timed out trying to send message 
to broker  + brokerId.toString)
  }
case None =
  warn(Not sending request %s to broker %d, since it is 
offline..format(request, brokerId))
  }
}
  }
{code}
# When broker which is a controler starts shut down if auto leader rebalance is 
running it deadlocks in the end (shutdown thread owns the lock and waits for 
rebalance thread to exit and rebalance thread wait for lock). *Proposed 
solution: use bounded wait in rebalance thread*. KafkaController.scala:
{code}
  // ODKL Patch to prevent deadlocks in shutdown.
  /**
   * Execute the given function inside the lock
   */
  def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = {
if (isRunning || lock.isHeldByCurrentThread) {
  // TODO: Configure timeout.
  if (!lock.tryLock(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(Failed to acquire controller lock in 
10 seconds.);
  }
  try {
return fun
  } finally {
lock.unlock()
  }
} else {
  throw new IllegalStateException(Controller is not running, not allowed 
to lock.)
}
  }

  private def checkAndTriggerPartitionRebalance(): Unit = {
// Use inLockIfRunning here instead of inLock
  }
{code}
# Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector act 
in a way that they prefer the oldes replica in ISR (the one that joined the ISR 
first). In case of rolling update it means moving partitions to the tail which 

[jira] [Updated] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-03-18 Thread Dmitry Bugaychenko (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Bugaychenko updated KAFKA-2029:
--
Description: 
Controlled shutdown as implemented currently can cause numerous problems: 
deadlocks, local and global datalos, partitions without leader and etc. In some 
cases the only way to restore cluster is to stop it completelly using kill -9 
and start again.

Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue 
size makes things much worse (see discussion there).

Note 2: The problems described here can occure in any setup, but they are 
extremly painful in setup with large brokers (36 disks, 1000+ assigned 
partitions per broker in our case).

Note 3: These improvements are actually workarounds and it is worth to consider 
global refactoring of the controller (make it single thread, or even get rid of 
it in the favour of ZK leader elections for partitions).

The problems are:

# Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
shutdown requests and finally considers it as failed and procedes to unclean 
shutdow, controller got stuck for a while (holding a lock waiting for free 
space in controller-to-broker queue). After broker starts back it receives 
followers request and erases highwatermarks (with a message that replica does 
not exists - controller hadn't yet sent a request with replica assignment), 
then controller starts replicas on the broker it deletes all local data (due to 
missing highwatermarks). Furthermore, controller starts processing pending 
shutdown request and stops replicas on the broker letting it in a 
non-functional state. Solution to the problem might be to increase time broker 
waits for controller reponse to shutdown request, but this timeout is taken 
from controller.socket.timeout.ms which is global for all broker-controller 
communication and setting it to 30 minutes is dangerous. *Proposed solution: 
introduce dedicated config parameter for this timeout with a high default*.
# If a broker gets down during controlled shutdown and did not come back 
controller got stuck in a deadlock (one thread owns the lock and tries to add 
message to the dead broker's queue, send thread is a infinite loop trying to 
retry message to the dead broker, and the broker failure handler is waiting for 
a lock). There are numerous partitions without a leader and the only way out is 
to kill -9 the controller. *Proposed solution: add timeout for adding message 
to broker's queue*. ControllerChannelManager.sendRequest:
{code}
  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
(RequestOrResponse) = Unit = null) {
brokerLock synchronized {
  val stateInfoOpt = brokerStateInfo.get(brokerId)
  stateInfoOpt match {
case Some(stateInfo) =
  // ODKL Patch: prevent infinite hang on trying to send message to a 
dead broker.
  // TODO: Move timeout to config
  if (!stateInfo.messageQueue.offer((request, callback), 10, 
TimeUnit.SECONDS)) {
error(Timed out trying to send message to broker  + 
brokerId.toString)
// Do not throw, as it brings controller into completely 
non-functional state
// Controller to broker state change requests batch is not empty 
while creating a new one
//throw new IllegalStateException(Timed out trying to send message 
to broker  + brokerId.toString)
  }
case None =
  warn(Not sending request %s to broker %d, since it is 
offline..format(request, brokerId))
  }
}
  }
{code}
# When broker which is a controler starts shut down if auto leader rebalance is 
running it deadlocks in the end (shutdown thread owns the lock and waits for 
rebalance thread to exit and rebalance thread wait for lock). *Proposed 
solution: use bounded wait in rebalance thread*. KafkaController.scala:
{code}
  // ODKL Patch to prevent deadlocks in shutdown.
  /**
   * Execute the given function inside the lock
   */
  def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = {
if (isRunning || lock.isHeldByCurrentThread) {
  // TODO: Configure timeout.
  if (!lock.tryLock(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(Failed to acquire controller lock in 
10 seconds.);
  }
  try {
return fun
  } finally {
lock.unlock()
  }
} else {
  throw new IllegalStateException(Controller is not running, not allowed 
to lock.)
}
  }

  private def checkAndTriggerPartitionRebalance(): Unit = {
// Use inLockIfRunning here instead of inLock
  }
{code}
# Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector act 
in a way that they prefer the oldes replica in ISR (the one that joined the ISR 
first). In case of rolling update it means moving partitions to the tail which 
increases the overal amount of movements and finally 

[jira] [Updated] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-03-18 Thread Dmitry Bugaychenko (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Bugaychenko updated KAFKA-2029:
--
Description: 
Controlled shutdown as implemented currently can cause numerous problems: 
deadlocks, local and global datalos, partitions without leader and etc. In some 
cases the only way to restore cluster is to stop it completelly using kill -9 
and start again.

Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue 
size makes things much worse (see discussion there).

Note 2: The problems described here can occure in any setup, but they are 
extremly painful in setup with large brokers (36 disks, 1000+ assigned 
partitions per broker in our case).

Note 3: These improvements are actually workarounds and it is worth to consider 
global refactoring of the controller (make it single thread, or even get rid of 
it in the favour of ZK leader elections for partitions).

The problems and improvements are:

# Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
shutdown requests and finally considers it as failed and procedes to unclean 
shutdow, controller got stuck for a while (holding a lock waiting for free 
space in controller-to-broker queue). After broker starts back it receives 
followers request and erases highwatermarks (with a message that replica does 
not exists - controller hadn't yet sent a request with replica assignment), 
then controller starts replicas on the broker it deletes all local data (due to 
missing highwatermarks). Furthermore, controller starts processing pending 
shutdown request and stops replicas on the broker letting it in a 
non-functional state. Solution to the problem might be to increase time broker 
waits for controller reponse to shutdown request, but this timeout is taken 
from controller.socket.timeout.ms which is global for all broker-controller 
communication and setting it to 30 minutes is dangerous. *Proposed solution: 
introduce dedicated config parameter for this timeout with a high default*.
# If a broker gets down during controlled shutdown and did not come back 
controller got stuck in a deadlock (one thread owns the lock and tries to add 
message to the dead broker's queue, send thread is a infinite loop trying to 
retry message to the dead broker, and the broker failure handler is waiting for 
a lock). There are numerous partitions without a leader and the only way out is 
to kill -9 the controller. *Proposed solution: add timeout for adding message 
to broker's queue*. ControllerChannelManager.sendRequest:
{code}
  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
(RequestOrResponse) = Unit = null) {
brokerLock synchronized {
  val stateInfoOpt = brokerStateInfo.get(brokerId)
  stateInfoOpt match {
case Some(stateInfo) =
  // ODKL Patch: prevent infinite hang on trying to send message to a 
dead broker.
  // TODO: Move timeout to config
  if (!stateInfo.messageQueue.offer((request, callback), 10, 
TimeUnit.SECONDS)) {
error(Timed out trying to send message to broker  + 
brokerId.toString)
// Do not throw, as it brings controller into completely 
non-functional state
// Controller to broker state change requests batch is not empty 
while creating a new one
//throw new IllegalStateException(Timed out trying to send message 
to broker  + brokerId.toString)
  }
case None =
  warn(Not sending request %s to broker %d, since it is 
offline..format(request, brokerId))
  }
}
  }
{code}
# When broker which is a controler starts shut down if auto leader rebalance is 
running it deadlocks in the end (shutdown thread owns the lock and waits for 
rebalance thread to exit and rebalance thread wait for lock). *Proposed 
solution: use bounded wait in rebalance thread*. KafkaController.scala:
{code}
  // ODKL Patch to prevent deadlocks in shutdown.
  /**
   * Execute the given function inside the lock
   */
  def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = {
if (isRunning || lock.isHeldByCurrentThread) {
  // TODO: Configure timeout.
  if (!lock.tryLock(10, TimeUnit.SECONDS)) {
throw new IllegalStateException(Failed to acquire controller lock in 
10 seconds.);
  }
  try {
return fun
  } finally {
lock.unlock()
  }
} else {
  throw new IllegalStateException(Controller is not running, not allowed 
to lock.)
}
  }

  private def checkAndTriggerPartitionRebalance(): Unit = {
// Use inLockIfRunning here instead of inLock
  }
{code}
# Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector act 
in a way that they prefer the oldes replica in ISR (the one that joined the ISR 
first). In case of rolling update it means moving partitions to the tail which 
increases the overal amount of 

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-18 Thread Jay Kreps
I'm +1 on Jun's suggestion as long as it can work for all the requests.

On Wed, Mar 18, 2015 at 3:35 PM, Jun Rao j...@confluent.io wrote:

 Andrii,

 I think we agreed on the following.

 (a) Admin requests can be sent to and handled by any broker.
 (b) Admin requests are processed asynchronously, at least for now. That is,
 when the client gets a response, it just means that the request is
 initiated, but not necessarily completed. Then, it's up to the client to
 issue another request to check the status for completion.

 To support (a), we were thinking of doing request forwarding to the
 controller (utilizing KAFKA-1912). I am making an alternative proposal.
 Basically, the broker can just write to ZooKeeper to inform the controller
 about the request. For example, to handle partitionReassignment, the broker
 will just write the requested partitions to /admin/reassign_partitions
 (like what AdminUtils currently does) and then send a response to the
 client. This shouldn't take long and the implementation will be simpler
 than forwarding the requests to the controller through RPC.

 Thanks,

 Jun


 On Wed, Mar 18, 2015 at 3:03 PM, Andrii Biletskyi 
 andrii.bilets...@stealth.ly wrote:

  Jun,
 
  I might be wrong but didn't we agree we will let any broker from the
  cluster handle *long-running* admin requests (at this time
 preferredReplica
  and
  reassignPartitions), via zk admin path. Thus CreateTopics etc should be
  sent
  only to the controller.
 
  Thanks,
  Andrii Biletskyi
 
  On Wed, Mar 18, 2015 at 11:55 PM, Jun Rao j...@confluent.io wrote:
 
   Joel, Andril,
  
   I think we agreed that those admin requests can be issued to any
 broker.
   Because of that, there doesn't seem to be a strong need to know the
   controller. So, perhaps we can proceed by not making any change to the
   format of TMR right now. When we start using create topic request in
 the
   producer, we will need a new version of TMR that doesn't trigger auto
  topic
   creation. But that can be done later.
  
   As a first cut implementation, I think the broker can just write to ZK
   directly for
   createToipic/alterTopic/reassignPartitions/preferredLeaderElection
   requests, instead of forwarding them to the controller. This will
  simplify
   the implementation on the broker side.
  
   Thanks,
  
   Jun
  
  
   On Wed, Mar 18, 2015 at 11:58 AM, Joel Koshy jjkosh...@gmail.com
  wrote:
  
(Thanks Andrii for the summary)
   
For (1) yes we will circle back on that shortly after syncing up in
person. I think it is close to getting committed although development
for KAFKA-1927 can probably begin without it.
   
There is one more item we covered at the hangout. i.e., whether we
want to add the coordinator to the topic metadata response or provide
a clearer ClusterMetadataRequest.
   
There are two reasons I think we should try and avoid adding the
field:
- It is irrelevant to topic metadata
- If we finally do request rerouting in Kafka then the field would
 add
  little to no value. (It still helps to have a separate
  ClusterMetadataRequest to query for cluster-wide information such
 as
  'which broker is the controller?' as Joe mentioned.)
   
I think it would be cleaner to have an explicit
 ClusterMetadataRequest
that you can send to any broker in order to obtain the controller
 (and
in the future possibly other cluster-wide information). I think the
main argument against doing this and instead adding it to the topic
metadata response was convenience - i.e., you don't have to discover
the controller in advance. However, I don't see much actual
benefit/convenience in this and in fact think it is a non-issue. Let
me know if I'm overlooking something here.
   
As an example, say we need to initiate partition reassignment by
issuing the new ReassignPartitionsRequest to the controller (assume
 we
already have the desired manual partition assignment).  If we are to
augment topic metadata response then the flow be something like this
 :
   
- Issue topic metadata request to any broker (and discover the
  controller
- Connect to controller if required (i.e., if the broker above !=
  controller)
- Issue the partition reassignment request to the controller.
   
With an explicit cluster metadata request it would be:
- Issue cluster metadata request to any broker
- Connect to controller if required (i.e., if the broker above !=
  controller)
- Issue the partition reassignment request
   
So it seems to add little practical value and bloats topic metadata
response with an irrelevant detail.
   
The other angle to this is the following - is it a matter of naming?
Should we just rename topic metadata request/response to just
MetadataRequest/Response and add cluster metadata to it? By that same
token should we also allow querying for the consumer coordinator (and

Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Gwen Shapira
We can't rip them out completely, unfortunately - the SimpleConsumer uses them.

So we'll need conversion at some point. I'll try to make the
conversion point just before hitting a public API that we can't
modify, and hopefully it won't look too arbitrary.



On Wed, Mar 18, 2015 at 5:24 PM, Jay Kreps jay.kr...@gmail.com wrote:
 I think either approach is okay in the short term. However our goal should
 be to eventually get rid of that duplicate code, so if you are up for just
 ripping and cutting that may get us there sooner.

 -Jay

 On Wed, Mar 18, 2015 at 5:19 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Thanks!

 Another clarification:
 The Common request/responses use slightly different infrastructure
 objects: Node instead of Broker, TopicPartition instead of
 TopicAndPartition and few more.

 I can write utilities to convert Node to Broker to minimize the scope
 of the change.
 Or I can start replacing Brokers with Nodes across the board.

 I'm currently taking the second approach - i.e, if MetadataRequest is
 now returning Node, I'm changing the entire line of dependencies to
 use Nodes instead of broker.

 Is this acceptable, or do we want to take a more minimal approach for
 this patch and do a larger replacement as a follow up?

 Gwen




 On Wed, Mar 18, 2015 at 3:32 PM, Jay Kreps jay.kr...@gmail.com wrote:
  Great.
 
  For (3) yeah I think we should just think through the end-to-end pattern
  for these versioned requests since it seems like we will have a number of
  them. The serialization code used as you described gets us to the right
  Struct which the user would then wrap in something like ProduceRequest.
  Presumably there would just be one ProduceRequest that would internally
  fill in things like null or otherwise adapt the struct to a usable
 object.
  On the response side we would have the version from the request to use
 for
  correct versioning. On question is whether this is enough or whether we
  need to have switches in KafkaApis to do things like
 if(produceRequest.version == 3)
 // do something
 else
// do something else
 
  Basically it would be good to be able to write a quick wiki that was like
  how to add or modify a kafka api that explained the right way to do all
  this.
 
  I don't think any of this necessarily blocks this ticket since at the
  moment we don't have tons of versions of requests out there.
 
  -Jay
 
  On Wed, Mar 18, 2015 at 2:50 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  See inline responses:
 
  On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps jay.kr...@gmail.com wrote:
   Hey Gwen,
  
   This makes sense to me.
  
   A couple of thoughts, mostly confirming what you said I think:
  
  1. Ideally we would move completely over to the new style of
 request
  definition for server-side processing, even for the internal
  requests. This
  way all requests would have the same header/body struct stuff. As
 you
  say
  for the internal requests we can just delete the scala code. For
 the
  old
  clients they will continue to use their old request definitions
 until
  we
  eol them. I would propose that new changes will go only into the
 new
  request/response objects and the old scala ones will be permanently
  stuck
  on their current version until discontinued. So after this change
  that old
  scala code could be considered frozen.
 
  SimpleConsumer is obviously stuck with the old request/response.
 
  The Producers can be converted to the common request/response without
  breaking compatibility.
  I think we should do this (even though it requires fiddling with
  additional network serialization code), just so we can throw the old
  ProduceRequest away.
 
  Does that make sense?
 
 
  2. I think it would be reasonable to keep all the requests under
  common,
  even though as you point out there is currently no use for some of
  them
  beyond broker-to-broker communication at the moment.
 
  Yep.
 
  3. We should think a little about how versioning will work. Making
  this
  convenient on the server side is an important goal for the new
 style
  of
  request definition. At the serialization level we now handle
  versioning but
  the question we should discuss and work out is how this will map to
  the
  request objects (which I assume will remain unversioned).
 
  The way I see it working (I just started on this, so I may have gaps):
 
  * Request header contains the version
  * When we read the request, we use ProtoUtils.requestSchema which
  takes version as a parameter and is responsible to give us the right
  Schema, which we use to read the buffer and get the correct struct.
  * KafkaApis handlers have the header, so they can use it to access the
  correct fields, build the correct response, etc.
 
  Does that sound about right?
 
 
  4. Ideally after this refactoring the network package should not be
  dependent on the individual request 

[jira] [Commented] (KAFKA-2030) Creating a topic cause Unsupported major.minor version 52.0

2015-03-18 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14368224#comment-14368224
 ] 

Ewen Cheslack-Postava commented on KAFKA-2030:
--

It looks like the homebrew bottle was compiled with Java 8 and you have a 
different version. The homebrew package isn't maintained here. The packaging 
script that generates the package is at 
https://github.com/Homebrew/homebrew/blob/master/Library/Formula/kafka.rb and 
you'll want to file a bug against that repository. Not sure if homebrew has a 
build_depends in addition to their depends settings, which is probably how 
you'd have to enforce something like building with JDK6 to get full 
compatibility. Not sure which version you're trying to run with, but it looks 
like they are only supporting 1.7+, whereas Kafka can still be built with JDK6 
so even if they fix the bottle to match the formula it's possible it still 
wouldn't work for you.

You might also want to look at: 
https://github.com/Homebrew/homebrew/blob/master/share/doc/homebrew/Bottles.md 
You could probably fix this by just turning off bottles. It'll have to build 
everything from scratch, but should definitely be compatible with whatever Java 
version you're using.

 Creating a topic cause Unsupported major.minor version 52.0
 ---

 Key: KAFKA-2030
 URL: https://issues.apache.org/jira/browse/KAFKA-2030
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.8.2.0
 Environment: OS X 10.9.5, scala 2.11.6, zookeeper 3.4.6_1, java 
 1.7.0_55
Reporter: Jeff Fogarty
Priority: Blocker

 I installed Kafka via brew.  
 I attempt the following;
 kafka-topics.sh --zookeeper localhost:2181 --create --topic zerg.hydra 
 --partitions 3 --replication-factor 2
 and get
 Error while executing topic command 
 org/apache/kafka/common/protocol/types/Schema : Unsupported major.minor 
 version 52.0
 java.lang.UnsupportedClassVersionError: 
 org/apache/kafka/common/protocol/types/Schema : Unsupported major.minor 
 version 52.0
   at java.lang.ClassLoader.defineClass1(Native Method)
   at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
   at 
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
   at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
   at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
   at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
   at java.security.AccessController.doPrivileged(Native Method)
   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
   at kafka.server.OffsetManager$.init(OffsetManager.scala:357)
   at kafka.server.OffsetManager$.clinit(OffsetManager.scala)
   at kafka.common.Topic$.init(Topic.scala:29)
   at kafka.common.Topic$.clinit(Topic.scala)
   at 
 kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
   at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
   at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93)
   at kafka.admin.TopicCommand$.main(TopicCommand.scala:55)
   at kafka.admin.TopicCommand.main(TopicCommand.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2030) Creating a topic cause Unsupported major.minor version 52.0

2015-03-18 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-2030.
--
Resolution: Invalid

 Creating a topic cause Unsupported major.minor version 52.0
 ---

 Key: KAFKA-2030
 URL: https://issues.apache.org/jira/browse/KAFKA-2030
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.8.2.0
 Environment: OS X 10.9.5, scala 2.11.6, zookeeper 3.4.6_1, java 
 1.7.0_55
Reporter: Jeff Fogarty
Priority: Blocker

 I installed Kafka via brew.  
 I attempt the following;
 kafka-topics.sh --zookeeper localhost:2181 --create --topic zerg.hydra 
 --partitions 3 --replication-factor 2
 and get
 Error while executing topic command 
 org/apache/kafka/common/protocol/types/Schema : Unsupported major.minor 
 version 52.0
 java.lang.UnsupportedClassVersionError: 
 org/apache/kafka/common/protocol/types/Schema : Unsupported major.minor 
 version 52.0
   at java.lang.ClassLoader.defineClass1(Native Method)
   at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
   at 
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
   at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
   at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
   at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
   at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
   at java.security.AccessController.doPrivileged(Native Method)
   at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
   at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
   at kafka.server.OffsetManager$.init(OffsetManager.scala:357)
   at kafka.server.OffsetManager$.clinit(OffsetManager.scala)
   at kafka.common.Topic$.init(Topic.scala:29)
   at kafka.common.Topic$.clinit(Topic.scala)
   at 
 kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
   at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
   at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93)
   at kafka.admin.TopicCommand$.main(TopicCommand.scala:55)
   at kafka.admin.TopicCommand.main(TopicCommand.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-7 Security - IP Filtering

2015-03-18 Thread Jun Rao
The proposal sounds reasonable. Timing wise, since we plan to refactor the
network layer code in the broker, perhaps this can wait until KAFKA-1928 is
done?

Thanks,

Jun

On Tue, Mar 17, 2015 at 6:56 AM, Jeff Holoman jholo...@cloudera.com wrote:

 bump

 On Tue, Mar 3, 2015 at 8:12 PM, Jeff Holoman jholo...@cloudera.com
 wrote:

  Guozhang,
 
  The way the patch is implemented, the check is done in the acceptor
 thread
  accept() method of the Socket Server, just before connectionQuotas.
 
  Thanks
 
  Jeff
 
  On Tue, Mar 3, 2015 at 7:59 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
  Jeff,
 
  I am wondering if the IP filtering rule can be enforced at the socket
  server level instead of the Kafka API level?
 
  Guozhang
 
  On Tue, Mar 3, 2015 at 2:24 PM, Jiangjie Qin j...@linkedin.com.invalid
 
  wrote:
 
   +1 (non-binding)
  
   On 3/3/15, 1:17 PM, Gwen Shapira gshap...@cloudera.com wrote:
  
   +1 (non-binding)
   
   On Tue, Mar 3, 2015 at 12:44 PM, Jeff Holoman jholo...@cloudera.com
 
   wrote:
Details in the wiki.
   
   
   
   
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-7+-+Security+-+IP+F
   iltering
   
   
   
--
Jeff Holoman
Systems Engineer
  
  
 
 
  --
  -- Guozhang
 
 
 
 
  --
  Jeff Holoman
  Systems Engineer
 
 
 
 


 --
 Jeff Holoman
 Systems Engineer



[jira] [Created] (KAFKA-2031) Executing scripts that invoke kafka-run-class.sh results in 'permission denied to create log dir' warning.

2015-03-18 Thread Manikandan Narayanaswamy (JIRA)
Manikandan Narayanaswamy created KAFKA-2031:
---

 Summary: Executing scripts that invoke kafka-run-class.sh results 
in 'permission denied to create log dir' warning.
 Key: KAFKA-2031
 URL: https://issues.apache.org/jira/browse/KAFKA-2031
 Project: Kafka
  Issue Type: Bug
  Components: packaging
Affects Versions: 0.8.1.1
Reporter: Manikandan Narayanaswamy
Priority: Minor


Kafka-run-class.sh script expects LOG_DIR variable to be set, and if this 
variable is not set, it defaults log dir to $base_dir/logs. And in the event 
the executor of the script does not have the right permissions, it would lead 
to errors such as:
{noformat}
mkdir: cannot create directory `/usr/lib/kafka/bin/../logs': Permission denied.
{noformat}

Proposing one way to make this more configurable is by introducing 
kafka-env.sh. Sourcing this file would export LOG_DIR and potentially other 
variables if need be.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Jay Kreps
I think either approach is okay in the short term. However our goal should
be to eventually get rid of that duplicate code, so if you are up for just
ripping and cutting that may get us there sooner.

-Jay

On Wed, Mar 18, 2015 at 5:19 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Thanks!

 Another clarification:
 The Common request/responses use slightly different infrastructure
 objects: Node instead of Broker, TopicPartition instead of
 TopicAndPartition and few more.

 I can write utilities to convert Node to Broker to minimize the scope
 of the change.
 Or I can start replacing Brokers with Nodes across the board.

 I'm currently taking the second approach - i.e, if MetadataRequest is
 now returning Node, I'm changing the entire line of dependencies to
 use Nodes instead of broker.

 Is this acceptable, or do we want to take a more minimal approach for
 this patch and do a larger replacement as a follow up?

 Gwen




 On Wed, Mar 18, 2015 at 3:32 PM, Jay Kreps jay.kr...@gmail.com wrote:
  Great.
 
  For (3) yeah I think we should just think through the end-to-end pattern
  for these versioned requests since it seems like we will have a number of
  them. The serialization code used as you described gets us to the right
  Struct which the user would then wrap in something like ProduceRequest.
  Presumably there would just be one ProduceRequest that would internally
  fill in things like null or otherwise adapt the struct to a usable
 object.
  On the response side we would have the version from the request to use
 for
  correct versioning. On question is whether this is enough or whether we
  need to have switches in KafkaApis to do things like
 if(produceRequest.version == 3)
 // do something
 else
// do something else
 
  Basically it would be good to be able to write a quick wiki that was like
  how to add or modify a kafka api that explained the right way to do all
  this.
 
  I don't think any of this necessarily blocks this ticket since at the
  moment we don't have tons of versions of requests out there.
 
  -Jay
 
  On Wed, Mar 18, 2015 at 2:50 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  See inline responses:
 
  On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps jay.kr...@gmail.com wrote:
   Hey Gwen,
  
   This makes sense to me.
  
   A couple of thoughts, mostly confirming what you said I think:
  
  1. Ideally we would move completely over to the new style of
 request
  definition for server-side processing, even for the internal
  requests. This
  way all requests would have the same header/body struct stuff. As
 you
  say
  for the internal requests we can just delete the scala code. For
 the
  old
  clients they will continue to use their old request definitions
 until
  we
  eol them. I would propose that new changes will go only into the
 new
  request/response objects and the old scala ones will be permanently
  stuck
  on their current version until discontinued. So after this change
  that old
  scala code could be considered frozen.
 
  SimpleConsumer is obviously stuck with the old request/response.
 
  The Producers can be converted to the common request/response without
  breaking compatibility.
  I think we should do this (even though it requires fiddling with
  additional network serialization code), just so we can throw the old
  ProduceRequest away.
 
  Does that make sense?
 
 
  2. I think it would be reasonable to keep all the requests under
  common,
  even though as you point out there is currently no use for some of
  them
  beyond broker-to-broker communication at the moment.
 
  Yep.
 
  3. We should think a little about how versioning will work. Making
  this
  convenient on the server side is an important goal for the new
 style
  of
  request definition. At the serialization level we now handle
  versioning but
  the question we should discuss and work out is how this will map to
  the
  request objects (which I assume will remain unversioned).
 
  The way I see it working (I just started on this, so I may have gaps):
 
  * Request header contains the version
  * When we read the request, we use ProtoUtils.requestSchema which
  takes version as a parameter and is responsible to give us the right
  Schema, which we use to read the buffer and get the correct struct.
  * KafkaApis handlers have the header, so they can use it to access the
  correct fields, build the correct response, etc.
 
  Does that sound about right?
 
 
  4. Ideally after this refactoring the network package should not be
  dependent on the individual request objects. The intention is that
  stuff in
  kafka.network is meant to be generic network infrastructure that
  doesn't
  know about the particular fetch/produce apis we have implemented on
  top.
 
  I'll make a note to validate that this is the case.
 
  
   -Jay
  
   On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira 

[jira] [Commented] (KAFKA-2031) Executing scripts that invoke kafka-run-class.sh results in 'permission denied to create log dir' warning.

2015-03-18 Thread Manikandan Narayanaswamy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14368285#comment-14368285
 ] 

Manikandan Narayanaswamy commented on KAFKA-2031:
-

Great! Thanks for resolving this one.

 Executing scripts that invoke kafka-run-class.sh results in 'permission 
 denied to create log dir' warning.
 --

 Key: KAFKA-2031
 URL: https://issues.apache.org/jira/browse/KAFKA-2031
 Project: Kafka
  Issue Type: Bug
  Components: packaging
Affects Versions: 0.8.1.1
Reporter: Manikandan Narayanaswamy
Priority: Minor

 Kafka-run-class.sh script expects LOG_DIR variable to be set, and if this 
 variable is not set, it defaults log dir to $base_dir/logs. And in the event 
 the executor of the script does not have the right permissions, it would lead 
 to errors such as:
 {noformat}
 mkdir: cannot create directory `/usr/lib/kafka/bin/../logs': Permission 
 denied.
 {noformat}
 Proposing one way to make this more configurable is by introducing 
 kafka-env.sh. Sourcing this file would export LOG_DIR and potentially other 
 variables if need be.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1684) Implement TLS/SSL authentication

2015-03-18 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani reassigned KAFKA-1684:
-

Assignee: Sriharsha Chintalapani  (was: Ivan Lyutov)

 Implement TLS/SSL authentication
 

 Key: KAFKA-1684
 URL: https://issues.apache.org/jira/browse/KAFKA-1684
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Affects Versions: 0.9.0
Reporter: Jay Kreps
Assignee: Sriharsha Chintalapani
 Attachments: KAFKA-1684.patch, KAFKA-1684.patch


 Add an SSL port to the configuration and advertise this as part of the 
 metadata request.
 If the SSL port is configured the socket server will need to add a second 
 Acceptor thread to listen on it. Connections accepted on this port will need 
 to go through the SSL handshake prior to being registered with a Processor 
 for request processing.
 SSL requests and responses may need to be wrapped or unwrapped using the 
 SSLEngine that was initialized by the acceptor. This wrapping and unwrapping 
 is very similar to what will need to be done for SASL-based authentication 
 schemes. We should have a uniform interface that covers both of these and we 
 will need to store the instance in the session with the request. The socket 
 server will have to use this object when reading and writing requests. We 
 will need to take care with the FetchRequests as the current 
 FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we 
 can only use this optimization for unencrypted sockets that don't require 
 userspace translation (wrapping).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


RE: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-18 Thread Aditya Auradkar
Hey everyone,

Thanks for the great discussion. There are currently a few points on this KIP 
that need addressing and I want to make sure we are on the same page about 
those.

1. Append and delay response vs delay and return error
- I think we've discussed the pros and cons of each approach but haven't chosen 
an approach yet. Where does everyone stand on this issue? 

2. Metrics Migration and usage in quotas
- The metrics library in clients has a notion of quotas that we should reuse. 
For that to happen, we need to migrate the server to the new metrics package.
- Need more clarification on how to compute throttling time and windowing for 
quotas.

I'm going to start a new KIP to discuss metrics migration separately. That will 
also contain a section on quotas.

3. Dynamic Configuration management - Being discussed in KIP-5. Basically we 
need something that will model default quotas and allow per-client overrides.

Is there something else that I'm missing?

Thanks,
Aditya

From: Jay Kreps [jay.kr...@gmail.com]
Sent: Wednesday, March 18, 2015 2:10 PM
To: dev@kafka.apache.org
Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

Hey Steven,

The current proposal is actually to enforce quotas at the
client/application level, NOT the topic level. So if you have a service
with a few dozen instances the quota is against all of those instances
added up across all their topics. So actually the effect would be the same
either way but throttling gives the producer the choice of either blocking
or dropping.

-Jay

On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote:

 Jay,

 let's say an app produces to 10 different topics. one of the topic is sent
 from a library. due to whatever condition/bug, this lib starts to send
 messages over the quota. if we go with the delayed response approach, it
 will cause the whole shared RecordAccumulator buffer to be filled up. that
 will penalize other 9 topics who are within the quota. that is the
 unfairness point that Ewen and I were trying to make.

 if broker just drop the msg and return an error/status code indicates the
 drop and why. then producer can just move on and accept the drop. shared
 buffer won't be saturated and other 9 topics won't be penalized.

 Thanks,
 Steven



 On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Steven,
 
  It is true that hitting the quota will cause back-pressure on the
 producer.
  But the solution is simple, a producer that wants to avoid this should
 stay
  under its quota. In other words this is a contract between the cluster
 and
  the client, with each side having something to uphold. Quite possibly the
  same thing will happen in the absence of a quota, a client that produces
 an
  unexpected amount of load will hit the limits of the server and
 experience
  backpressure. Quotas just allow you to set that same limit at something
  lower than 100% of all resources on the server, which is useful for a
  shared cluster.
 
  -Jay
 
  On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com
 wrote:
 
   wait. we create one kafka producer for each cluster. each cluster can
  have
   many topics. if producer buffer got filled up due to delayed response
 for
   one throttled topic, won't that penalize other topics unfairly? it
 seems
  to
   me that broker should just return error without delay.
  
   sorry that I am chatting to myself :)
  
   On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com
  wrote:
  
I think I can answer my own question. delayed response will cause the
producer buffer to be full, which then result in either thread
 blocking
   or
message drop.
   
On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com
   wrote:
   
please correct me if I am missing sth here. I am not understanding
 how
would throttle work without cooperation/back-off from producer. new
  Java
producer supports non-blocking API. why would delayed response be
 able
   to
slow down producer? producer will continue to fire async sends.
   
On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com
 
wrote:
   
I think we are really discussing two separate issues here:
   
1. Whether we should a) append-then-block-then-returnOKButThrottled
  or
   b)
block-then-returnFailDuetoThrottled for quota actions on produce
requests.
   
Both these approaches assume some kind of well-behaveness of the
   clients:
option a) assumes the client sets an proper timeout value while can
   just
ignore OKButThrottled response, while option b) assumes the
 client
handles the FailDuetoThrottled appropriately. For any malicious
   clients
that, for example, just keep retrying either intentionally or not,
neither
of these approaches are actually effective.
   
2. For OKButThrottled and FailDuetoThrottled responses, shall
 we
encode
them as error codes or augment the 

Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Jay Kreps
Great.

For (3) yeah I think we should just think through the end-to-end pattern
for these versioned requests since it seems like we will have a number of
them. The serialization code used as you described gets us to the right
Struct which the user would then wrap in something like ProduceRequest.
Presumably there would just be one ProduceRequest that would internally
fill in things like null or otherwise adapt the struct to a usable object.
On the response side we would have the version from the request to use for
correct versioning. On question is whether this is enough or whether we
need to have switches in KafkaApis to do things like
   if(produceRequest.version == 3)
   // do something
   else
  // do something else

Basically it would be good to be able to write a quick wiki that was like
how to add or modify a kafka api that explained the right way to do all
this.

I don't think any of this necessarily blocks this ticket since at the
moment we don't have tons of versions of requests out there.

-Jay

On Wed, Mar 18, 2015 at 2:50 PM, Gwen Shapira gshap...@cloudera.com wrote:

 See inline responses:

 On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps jay.kr...@gmail.com wrote:
  Hey Gwen,
 
  This makes sense to me.
 
  A couple of thoughts, mostly confirming what you said I think:
 
 1. Ideally we would move completely over to the new style of request
 definition for server-side processing, even for the internal
 requests. This
 way all requests would have the same header/body struct stuff. As you
 say
 for the internal requests we can just delete the scala code. For the
 old
 clients they will continue to use their old request definitions until
 we
 eol them. I would propose that new changes will go only into the new
 request/response objects and the old scala ones will be permanently
 stuck
 on their current version until discontinued. So after this change
 that old
 scala code could be considered frozen.

 SimpleConsumer is obviously stuck with the old request/response.

 The Producers can be converted to the common request/response without
 breaking compatibility.
 I think we should do this (even though it requires fiddling with
 additional network serialization code), just so we can throw the old
 ProduceRequest away.

 Does that make sense?


 2. I think it would be reasonable to keep all the requests under
 common,
 even though as you point out there is currently no use for some of
 them
 beyond broker-to-broker communication at the moment.

 Yep.

 3. We should think a little about how versioning will work. Making
 this
 convenient on the server side is an important goal for the new style
 of
 request definition. At the serialization level we now handle
 versioning but
 the question we should discuss and work out is how this will map to
 the
 request objects (which I assume will remain unversioned).

 The way I see it working (I just started on this, so I may have gaps):

 * Request header contains the version
 * When we read the request, we use ProtoUtils.requestSchema which
 takes version as a parameter and is responsible to give us the right
 Schema, which we use to read the buffer and get the correct struct.
 * KafkaApis handlers have the header, so they can use it to access the
 correct fields, build the correct response, etc.

 Does that sound about right?


 4. Ideally after this refactoring the network package should not be
 dependent on the individual request objects. The intention is that
 stuff in
 kafka.network is meant to be generic network infrastructure that
 doesn't
 know about the particular fetch/produce apis we have implemented on
 top.

 I'll make a note to validate that this is the case.

 
  -Jay
 
  On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
  Hi Jun,
 
  I was taking a slightly different approach. Let me know if it makes
  sense to you:
 
  1. Get the bytes from network (kinda unavoidable...)
  2. Modify RequestChannel.Request to contain header and body (instead
  of a single object)
  3. Create the head and body from bytes as follow:
  val header: RequestHeader = RequestHeader.parse(buffer)
  val apiKey: Int = header.apiKey
  val body: Struct =
 
 ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
  4. KafkaAPIs will continue getting RequestChannel.Request, but will
  now have access to body and header separately.
 
  I agree that I need a Request/Response objects that contain only the
  body for all requests objects.
  I'm thinking of implementing them in o.a.k.Common.Requests in Java for
  consistency.
 
  When we are discussing the requests/responses used in SimpleConsumer,
  we mean everything used in javaapi, right?
 
  Gwen
 
 
 
  On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote:
   Hi, Gwen,
  
   I was thinking that we will be doing the following in KAFKA-1927.
  
   1. Get the bytes from 

Build failed in Jenkins: KafkaPreCommit #40

2015-03-18 Thread Apache Jenkins Server
See https://builds.apache.org/job/KafkaPreCommit/40/changes

Changes:

[wangguoz] KAFKA-1997; Hopefully last follow-up fix to get messageHandlerArgs 
right

--
[...truncated 3501 lines...]
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.api.test.ProducerCompressionTest.setUp(ProducerCompressionTest.scala:54)

java.lang.NullPointerException
at 
kafka.api.test.ProducerCompressionTest.tearDown(ProducerCompressionTest.scala:60)

kafka.network.SocketServerTest  simpleRequest PASSED

kafka.network.SocketServerTest  tooBigRequestIsRejected PASSED

kafka.network.SocketServerTest  testNullResponse PASSED

kafka.network.SocketServerTest  testSocketsCloseOnShutdown PASSED

kafka.network.SocketServerTest  testMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest  testMaxConnectionsPerIPOverrides PASSED

kafka.log.OffsetIndexTest  truncate PASSED

kafka.log.OffsetIndexTest  randomLookupTest PASSED

kafka.log.OffsetIndexTest  lookupExtremeCases PASSED

kafka.log.OffsetIndexTest  appendTooMany PASSED

kafka.log.OffsetIndexTest  appendOutOfOrder PASSED

kafka.log.OffsetIndexTest  testReopen PASSED

kafka.log.OffsetMapTest  testBasicValidation PASSED

kafka.log.OffsetMapTest  testClear PASSED

kafka.log.CleanerTest  testCleanSegments PASSED

kafka.log.CleanerTest  testCleaningWithDeletes PASSED

kafka.log.CleanerTest  testCleaningWithUnkeyedMessages PASSED

kafka.log.CleanerTest  testCleanSegmentsWithAbort PASSED

kafka.log.CleanerTest  testSegmentGrouping PASSED

kafka.log.CleanerTest  testBuildOffsetMap PASSED

kafka.log.FileMessageSetTest  testWrittenEqualsRead PASSED

kafka.log.FileMessageSetTest  testIteratorIsConsistent PASSED

kafka.log.FileMessageSetTest  testSizeInBytes PASSED

kafka.log.FileMessageSetTest  testWriteTo PASSED

kafka.log.FileMessageSetTest  testFileSize PASSED

kafka.log.FileMessageSetTest  testIterationOverPartialAndTruncation PASSED

kafka.log.FileMessageSetTest  testIterationDoesntChangePosition PASSED

kafka.log.FileMessageSetTest  testRead PASSED

kafka.log.FileMessageSetTest  testSearch PASSED

kafka.log.FileMessageSetTest  testIteratorWithLimits PASSED

kafka.log.FileMessageSetTest  testTruncate PASSED

kafka.log.LogTest  testTimeBasedLogRoll PASSED

kafka.log.LogTest  testTimeBasedLogRollJitter PASSED

kafka.log.LogTest  testSizeBasedLogRoll PASSED

kafka.log.LogTest  testLoadEmptyLog PASSED

kafka.log.LogTest  testAppendAndReadWithSequentialOffsets PASSED

kafka.log.LogTest  testAppendAndReadWithNonSequentialOffsets PASSED

kafka.log.LogTest  testReadAtLogGap PASSED

kafka.log.LogTest  testReadOutOfRange PASSED

kafka.log.LogTest  testLogRolls PASSED

kafka.log.LogTest  testCompressedMessages PASSED

kafka.log.LogTest  testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED

kafka.log.LogTest  testMessageSetSizeCheck PASSED

kafka.log.LogTest  testCompactedTopicConstraints PASSED

kafka.log.LogTest  testMessageSizeCheck PASSED

kafka.log.LogTest  testLogRecoversToCorrectOffset PASSED

kafka.log.LogTest  testIndexRebuild PASSED

kafka.log.LogTest  testTruncateTo PASSED

kafka.log.LogTest  testIndexResizingAtTruncation PASSED

kafka.log.LogTest  testBogusIndexSegmentsAreRemoved PASSED

kafka.log.LogTest  testReopenThenTruncate PASSED

kafka.log.LogTest  testAsyncDelete PASSED

kafka.log.LogTest  testOpenDeletesObsoleteFiles PASSED

kafka.log.LogTest  testAppendMessageWithNullPayload PASSED

kafka.log.LogTest  testCorruptLog PASSED

kafka.log.LogTest  testCleanShutdownFile PASSED

kafka.log.LogTest  testParseTopicPartitionName PASSED

kafka.log.LogTest  testParseTopicPartitionNameForEmptyName PASSED

kafka.log.LogTest  testParseTopicPartitionNameForNull PASSED

kafka.log.LogTest  testParseTopicPartitionNameForMissingSeparator PASSED

kafka.log.LogTest  testParseTopicPartitionNameForMissingTopic PASSED

kafka.log.LogTest  testParseTopicPartitionNameForMissingPartition PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[11] PASSED


Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-18 Thread Jun Rao
Andrii,

I think we agreed on the following.

(a) Admin requests can be sent to and handled by any broker.
(b) Admin requests are processed asynchronously, at least for now. That is,
when the client gets a response, it just means that the request is
initiated, but not necessarily completed. Then, it's up to the client to
issue another request to check the status for completion.

To support (a), we were thinking of doing request forwarding to the
controller (utilizing KAFKA-1912). I am making an alternative proposal.
Basically, the broker can just write to ZooKeeper to inform the controller
about the request. For example, to handle partitionReassignment, the broker
will just write the requested partitions to /admin/reassign_partitions
(like what AdminUtils currently does) and then send a response to the
client. This shouldn't take long and the implementation will be simpler
than forwarding the requests to the controller through RPC.

Thanks,

Jun


On Wed, Mar 18, 2015 at 3:03 PM, Andrii Biletskyi 
andrii.bilets...@stealth.ly wrote:

 Jun,

 I might be wrong but didn't we agree we will let any broker from the
 cluster handle *long-running* admin requests (at this time preferredReplica
 and
 reassignPartitions), via zk admin path. Thus CreateTopics etc should be
 sent
 only to the controller.

 Thanks,
 Andrii Biletskyi

 On Wed, Mar 18, 2015 at 11:55 PM, Jun Rao j...@confluent.io wrote:

  Joel, Andril,
 
  I think we agreed that those admin requests can be issued to any broker.
  Because of that, there doesn't seem to be a strong need to know the
  controller. So, perhaps we can proceed by not making any change to the
  format of TMR right now. When we start using create topic request in the
  producer, we will need a new version of TMR that doesn't trigger auto
 topic
  creation. But that can be done later.
 
  As a first cut implementation, I think the broker can just write to ZK
  directly for
  createToipic/alterTopic/reassignPartitions/preferredLeaderElection
  requests, instead of forwarding them to the controller. This will
 simplify
  the implementation on the broker side.
 
  Thanks,
 
  Jun
 
 
  On Wed, Mar 18, 2015 at 11:58 AM, Joel Koshy jjkosh...@gmail.com
 wrote:
 
   (Thanks Andrii for the summary)
  
   For (1) yes we will circle back on that shortly after syncing up in
   person. I think it is close to getting committed although development
   for KAFKA-1927 can probably begin without it.
  
   There is one more item we covered at the hangout. i.e., whether we
   want to add the coordinator to the topic metadata response or provide
   a clearer ClusterMetadataRequest.
  
   There are two reasons I think we should try and avoid adding the
   field:
   - It is irrelevant to topic metadata
   - If we finally do request rerouting in Kafka then the field would add
 little to no value. (It still helps to have a separate
 ClusterMetadataRequest to query for cluster-wide information such as
 'which broker is the controller?' as Joe mentioned.)
  
   I think it would be cleaner to have an explicit ClusterMetadataRequest
   that you can send to any broker in order to obtain the controller (and
   in the future possibly other cluster-wide information). I think the
   main argument against doing this and instead adding it to the topic
   metadata response was convenience - i.e., you don't have to discover
   the controller in advance. However, I don't see much actual
   benefit/convenience in this and in fact think it is a non-issue. Let
   me know if I'm overlooking something here.
  
   As an example, say we need to initiate partition reassignment by
   issuing the new ReassignPartitionsRequest to the controller (assume we
   already have the desired manual partition assignment).  If we are to
   augment topic metadata response then the flow be something like this :
  
   - Issue topic metadata request to any broker (and discover the
 controller
   - Connect to controller if required (i.e., if the broker above !=
 controller)
   - Issue the partition reassignment request to the controller.
  
   With an explicit cluster metadata request it would be:
   - Issue cluster metadata request to any broker
   - Connect to controller if required (i.e., if the broker above !=
 controller)
   - Issue the partition reassignment request
  
   So it seems to add little practical value and bloats topic metadata
   response with an irrelevant detail.
  
   The other angle to this is the following - is it a matter of naming?
   Should we just rename topic metadata request/response to just
   MetadataRequest/Response and add cluster metadata to it? By that same
   token should we also allow querying for the consumer coordinator (and
   in future transaction coordinator) as well? This leads to a bloated
   request which isn't very appealing and altogether confusing.
  
   Thanks,
  
   Joel
  
   On Wed, Mar 18, 2015 at 09:34:12AM -0700, Jun Rao wrote:
Andri,
   
Thanks for the 

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-18 Thread Joel Koshy
+1 as well. I think it helps to keep the rerouting approach orthogonal
to this KIP.

On Wed, Mar 18, 2015 at 03:40:48PM -0700, Jay Kreps wrote:
 I'm +1 on Jun's suggestion as long as it can work for all the requests.
 
 On Wed, Mar 18, 2015 at 3:35 PM, Jun Rao j...@confluent.io wrote:
 
  Andrii,
 
  I think we agreed on the following.
 
  (a) Admin requests can be sent to and handled by any broker.
  (b) Admin requests are processed asynchronously, at least for now. That is,
  when the client gets a response, it just means that the request is
  initiated, but not necessarily completed. Then, it's up to the client to
  issue another request to check the status for completion.
 
  To support (a), we were thinking of doing request forwarding to the
  controller (utilizing KAFKA-1912). I am making an alternative proposal.
  Basically, the broker can just write to ZooKeeper to inform the controller
  about the request. For example, to handle partitionReassignment, the broker
  will just write the requested partitions to /admin/reassign_partitions
  (like what AdminUtils currently does) and then send a response to the
  client. This shouldn't take long and the implementation will be simpler
  than forwarding the requests to the controller through RPC.
 
  Thanks,
 
  Jun
 
 
  On Wed, Mar 18, 2015 at 3:03 PM, Andrii Biletskyi 
  andrii.bilets...@stealth.ly wrote:
 
   Jun,
  
   I might be wrong but didn't we agree we will let any broker from the
   cluster handle *long-running* admin requests (at this time
  preferredReplica
   and
   reassignPartitions), via zk admin path. Thus CreateTopics etc should be
   sent
   only to the controller.
  
   Thanks,
   Andrii Biletskyi
  
   On Wed, Mar 18, 2015 at 11:55 PM, Jun Rao j...@confluent.io wrote:
  
Joel, Andril,
   
I think we agreed that those admin requests can be issued to any
  broker.
Because of that, there doesn't seem to be a strong need to know the
controller. So, perhaps we can proceed by not making any change to the
format of TMR right now. When we start using create topic request in
  the
producer, we will need a new version of TMR that doesn't trigger auto
   topic
creation. But that can be done later.
   
As a first cut implementation, I think the broker can just write to ZK
directly for
createToipic/alterTopic/reassignPartitions/preferredLeaderElection
requests, instead of forwarding them to the controller. This will
   simplify
the implementation on the broker side.
   
Thanks,
   
Jun
   
   
On Wed, Mar 18, 2015 at 11:58 AM, Joel Koshy jjkosh...@gmail.com
   wrote:
   
 (Thanks Andrii for the summary)

 For (1) yes we will circle back on that shortly after syncing up in
 person. I think it is close to getting committed although development
 for KAFKA-1927 can probably begin without it.

 There is one more item we covered at the hangout. i.e., whether we
 want to add the coordinator to the topic metadata response or provide
 a clearer ClusterMetadataRequest.

 There are two reasons I think we should try and avoid adding the
 field:
 - It is irrelevant to topic metadata
 - If we finally do request rerouting in Kafka then the field would
  add
   little to no value. (It still helps to have a separate
   ClusterMetadataRequest to query for cluster-wide information such
  as
   'which broker is the controller?' as Joe mentioned.)

 I think it would be cleaner to have an explicit
  ClusterMetadataRequest
 that you can send to any broker in order to obtain the controller
  (and
 in the future possibly other cluster-wide information). I think the
 main argument against doing this and instead adding it to the topic
 metadata response was convenience - i.e., you don't have to discover
 the controller in advance. However, I don't see much actual
 benefit/convenience in this and in fact think it is a non-issue. Let
 me know if I'm overlooking something here.

 As an example, say we need to initiate partition reassignment by
 issuing the new ReassignPartitionsRequest to the controller (assume
  we
 already have the desired manual partition assignment).  If we are to
 augment topic metadata response then the flow be something like this
  :

 - Issue topic metadata request to any broker (and discover the
   controller
 - Connect to controller if required (i.e., if the broker above !=
   controller)
 - Issue the partition reassignment request to the controller.

 With an explicit cluster metadata request it would be:
 - Issue cluster metadata request to any broker
 - Connect to controller if required (i.e., if the broker above !=
   controller)
 - Issue the partition reassignment request

 So it seems to add little practical value and bloats topic metadata
 response with an irrelevant detail.

 The other angle 

[jira] [Resolved] (KAFKA-2031) Executing scripts that invoke kafka-run-class.sh results in 'permission denied to create log dir' warning.

2015-03-18 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani resolved KAFKA-2031.
---
Resolution: Duplicate

 Executing scripts that invoke kafka-run-class.sh results in 'permission 
 denied to create log dir' warning.
 --

 Key: KAFKA-2031
 URL: https://issues.apache.org/jira/browse/KAFKA-2031
 Project: Kafka
  Issue Type: Bug
  Components: packaging
Affects Versions: 0.8.1.1
Reporter: Manikandan Narayanaswamy
Priority: Minor

 Kafka-run-class.sh script expects LOG_DIR variable to be set, and if this 
 variable is not set, it defaults log dir to $base_dir/logs. And in the event 
 the executor of the script does not have the right permissions, it would lead 
 to errors such as:
 {noformat}
 mkdir: cannot create directory `/usr/lib/kafka/bin/../logs': Permission 
 denied.
 {noformat}
 Proposing one way to make this more configurable is by introducing 
 kafka-env.sh. Sourcing this file would export LOG_DIR and potentially other 
 variables if need be.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-18 Thread Joel Koshy
Yes that is what I was alluding to when I said that if we finally do
request rerouting in Kafka then the field would add little to no
value. I wasn't sure if we agreed that we _will_ do rerouting or
whether we agreed to evaluate it (KAFKA-1912). Andrii can you update
the KIP with this?

Thanks,

Joel

On Wed, Mar 18, 2015 at 02:55:00PM -0700, Jun Rao wrote:
 Joel, Andril,
 
 I think we agreed that those admin requests can be issued to any broker.
 Because of that, there doesn't seem to be a strong need to know the
 controller. So, perhaps we can proceed by not making any change to the
 format of TMR right now. When we start using create topic request in the
 producer, we will need a new version of TMR that doesn't trigger auto topic
 creation. But that can be done later.
 
 As a first cut implementation, I think the broker can just write to ZK
 directly for
 createToipic/alterTopic/reassignPartitions/preferredLeaderElection
 requests, instead of forwarding them to the controller. This will simplify
 the implementation on the broker side.
 
 Thanks,
 
 Jun
 
 
 On Wed, Mar 18, 2015 at 11:58 AM, Joel Koshy jjkosh...@gmail.com wrote:
 
  (Thanks Andrii for the summary)
 
  For (1) yes we will circle back on that shortly after syncing up in
  person. I think it is close to getting committed although development
  for KAFKA-1927 can probably begin without it.
 
  There is one more item we covered at the hangout. i.e., whether we
  want to add the coordinator to the topic metadata response or provide
  a clearer ClusterMetadataRequest.
 
  There are two reasons I think we should try and avoid adding the
  field:
  - It is irrelevant to topic metadata
  - If we finally do request rerouting in Kafka then the field would add
little to no value. (It still helps to have a separate
ClusterMetadataRequest to query for cluster-wide information such as
'which broker is the controller?' as Joe mentioned.)
 
  I think it would be cleaner to have an explicit ClusterMetadataRequest
  that you can send to any broker in order to obtain the controller (and
  in the future possibly other cluster-wide information). I think the
  main argument against doing this and instead adding it to the topic
  metadata response was convenience - i.e., you don't have to discover
  the controller in advance. However, I don't see much actual
  benefit/convenience in this and in fact think it is a non-issue. Let
  me know if I'm overlooking something here.
 
  As an example, say we need to initiate partition reassignment by
  issuing the new ReassignPartitionsRequest to the controller (assume we
  already have the desired manual partition assignment).  If we are to
  augment topic metadata response then the flow be something like this :
 
  - Issue topic metadata request to any broker (and discover the
controller
  - Connect to controller if required (i.e., if the broker above !=
controller)
  - Issue the partition reassignment request to the controller.
 
  With an explicit cluster metadata request it would be:
  - Issue cluster metadata request to any broker
  - Connect to controller if required (i.e., if the broker above !=
controller)
  - Issue the partition reassignment request
 
  So it seems to add little practical value and bloats topic metadata
  response with an irrelevant detail.
 
  The other angle to this is the following - is it a matter of naming?
  Should we just rename topic metadata request/response to just
  MetadataRequest/Response and add cluster metadata to it? By that same
  token should we also allow querying for the consumer coordinator (and
  in future transaction coordinator) as well? This leads to a bloated
  request which isn't very appealing and altogether confusing.
 
  Thanks,
 
  Joel
 
  On Wed, Mar 18, 2015 at 09:34:12AM -0700, Jun Rao wrote:
   Andri,
  
   Thanks for the summary.
  
   1. I just realized that in order to start working on KAFKA-1927, we will
   need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk.
   This is planned to be done as part of KAFKA-1634. So, we will need
  Guozhang
   and Joel's help to wrap this up.
  
   2. Thinking about this a bit more, if the semantic of those write
   requests is async (i.e., after the client gets a response, it just means
   that the operation is initiated, but not necessarily completed), we don't
   really need to forward the requests to the controller. Instead, the
   receiving broker can just write the operation to ZK as the admin command
   line tool previously does. This will simplify the implementation.
  
   8. There is another implementation detail for describe topic. Ideally, we
   want to read the topic config from the broker cache, instead of
  ZooKeeper.
   Currently, every broker reads the topic-level config for all topics.
   However, it ignores those for topics not hosted on itself. So, we may
  need
   to change TopicConfigManager a bit so that it caches the configs for all
   topics.
  
   Thanks,
  

[jira] [Created] (KAFKA-2030) Creating a topic cause Unsupported major.minor version 52.0

2015-03-18 Thread Jeff Fogarty (JIRA)
Jeff Fogarty created KAFKA-2030:
---

 Summary: Creating a topic cause Unsupported major.minor version 
52.0
 Key: KAFKA-2030
 URL: https://issues.apache.org/jira/browse/KAFKA-2030
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.8.2.0
 Environment: OS X 10.9.5, scala 2.11.6, zookeeper 3.4.6_1, java 
1.7.0_55
Reporter: Jeff Fogarty
Priority: Blocker


I installed Kafka via brew.  
I attempt the following;
kafka-topics.sh --zookeeper localhost:2181 --create --topic zerg.hydra 
--partitions 3 --replication-factor 2
and get
Error while executing topic command 
org/apache/kafka/common/protocol/types/Schema : Unsupported major.minor version 
52.0
java.lang.UnsupportedClassVersionError: 
org/apache/kafka/common/protocol/types/Schema : Unsupported major.minor version 
52.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at kafka.server.OffsetManager$.init(OffsetManager.scala:357)
at kafka.server.OffsetManager$.clinit(OffsetManager.scala)
at kafka.common.Topic$.init(Topic.scala:29)
at kafka.common.Topic$.clinit(Topic.scala)
at 
kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:55)
at kafka.admin.TopicCommand.main(TopicCommand.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-18 Thread Jay Kreps
I think that is a good summary.

1. I'd favor delay over error--I think we have a design where delay will
basically mimic the same behavior you would get if you had a lower capacity
Kafka cluster all to yourself, which from my point of view is ideal. I'm
aesthetically opposed to delay+error.
2. I actually don't think this is blocked on completing the metrics
migration. I do think we need to figure out the direction, though to decide
what to do.
3. Yup.

-Jay


On Wed, Mar 18, 2015 at 4:02 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 Hey everyone,

 Thanks for the great discussion. There are currently a few points on this
 KIP that need addressing and I want to make sure we are on the same page
 about those.

 1. Append and delay response vs delay and return error
 - I think we've discussed the pros and cons of each approach but haven't
 chosen an approach yet. Where does everyone stand on this issue?

 2. Metrics Migration and usage in quotas
 - The metrics library in clients has a notion of quotas that we should
 reuse. For that to happen, we need to migrate the server to the new metrics
 package.
 - Need more clarification on how to compute throttling time and windowing
 for quotas.

 I'm going to start a new KIP to discuss metrics migration separately. That
 will also contain a section on quotas.

 3. Dynamic Configuration management - Being discussed in KIP-5. Basically
 we need something that will model default quotas and allow per-client
 overrides.

 Is there something else that I'm missing?

 Thanks,
 Aditya
 
 From: Jay Kreps [jay.kr...@gmail.com]
 Sent: Wednesday, March 18, 2015 2:10 PM
 To: dev@kafka.apache.org
 Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

 Hey Steven,

 The current proposal is actually to enforce quotas at the
 client/application level, NOT the topic level. So if you have a service
 with a few dozen instances the quota is against all of those instances
 added up across all their topics. So actually the effect would be the same
 either way but throttling gives the producer the choice of either blocking
 or dropping.

 -Jay

 On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote:

  Jay,
 
  let's say an app produces to 10 different topics. one of the topic is
 sent
  from a library. due to whatever condition/bug, this lib starts to send
  messages over the quota. if we go with the delayed response approach, it
  will cause the whole shared RecordAccumulator buffer to be filled up.
 that
  will penalize other 9 topics who are within the quota. that is the
  unfairness point that Ewen and I were trying to make.
 
  if broker just drop the msg and return an error/status code indicates the
  drop and why. then producer can just move on and accept the drop. shared
  buffer won't be saturated and other 9 topics won't be penalized.
 
  Thanks,
  Steven
 
 
 
  On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
   Hey Steven,
  
   It is true that hitting the quota will cause back-pressure on the
  producer.
   But the solution is simple, a producer that wants to avoid this should
  stay
   under its quota. In other words this is a contract between the cluster
  and
   the client, with each side having something to uphold. Quite possibly
 the
   same thing will happen in the absence of a quota, a client that
 produces
  an
   unexpected amount of load will hit the limits of the server and
  experience
   backpressure. Quotas just allow you to set that same limit at something
   lower than 100% of all resources on the server, which is useful for a
   shared cluster.
  
   -Jay
  
   On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com
  wrote:
  
wait. we create one kafka producer for each cluster. each cluster can
   have
many topics. if producer buffer got filled up due to delayed response
  for
one throttled topic, won't that penalize other topics unfairly? it
  seems
   to
me that broker should just return error without delay.
   
sorry that I am chatting to myself :)
   
On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com
   wrote:
   
 I think I can answer my own question. delayed response will cause
 the
 producer buffer to be full, which then result in either thread
  blocking
or
 message drop.

 On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com
wrote:

 please correct me if I am missing sth here. I am not understanding
  how
 would throttle work without cooperation/back-off from producer.
 new
   Java
 producer supports non-blocking API. why would delayed response be
  able
to
 slow down producer? producer will continue to fire async sends.

 On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang 
 wangg...@gmail.com
  
 wrote:

 I think we are really discussing two separate issues here:

 1. Whether we should a)
 

Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Gwen Shapira
Thanks!

Another clarification:
The Common request/responses use slightly different infrastructure
objects: Node instead of Broker, TopicPartition instead of
TopicAndPartition and few more.

I can write utilities to convert Node to Broker to minimize the scope
of the change.
Or I can start replacing Brokers with Nodes across the board.

I'm currently taking the second approach - i.e, if MetadataRequest is
now returning Node, I'm changing the entire line of dependencies to
use Nodes instead of broker.

Is this acceptable, or do we want to take a more minimal approach for
this patch and do a larger replacement as a follow up?

Gwen




On Wed, Mar 18, 2015 at 3:32 PM, Jay Kreps jay.kr...@gmail.com wrote:
 Great.

 For (3) yeah I think we should just think through the end-to-end pattern
 for these versioned requests since it seems like we will have a number of
 them. The serialization code used as you described gets us to the right
 Struct which the user would then wrap in something like ProduceRequest.
 Presumably there would just be one ProduceRequest that would internally
 fill in things like null or otherwise adapt the struct to a usable object.
 On the response side we would have the version from the request to use for
 correct versioning. On question is whether this is enough or whether we
 need to have switches in KafkaApis to do things like
if(produceRequest.version == 3)
// do something
else
   // do something else

 Basically it would be good to be able to write a quick wiki that was like
 how to add or modify a kafka api that explained the right way to do all
 this.

 I don't think any of this necessarily blocks this ticket since at the
 moment we don't have tons of versions of requests out there.

 -Jay

 On Wed, Mar 18, 2015 at 2:50 PM, Gwen Shapira gshap...@cloudera.com wrote:

 See inline responses:

 On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps jay.kr...@gmail.com wrote:
  Hey Gwen,
 
  This makes sense to me.
 
  A couple of thoughts, mostly confirming what you said I think:
 
 1. Ideally we would move completely over to the new style of request
 definition for server-side processing, even for the internal
 requests. This
 way all requests would have the same header/body struct stuff. As you
 say
 for the internal requests we can just delete the scala code. For the
 old
 clients they will continue to use their old request definitions until
 we
 eol them. I would propose that new changes will go only into the new
 request/response objects and the old scala ones will be permanently
 stuck
 on their current version until discontinued. So after this change
 that old
 scala code could be considered frozen.

 SimpleConsumer is obviously stuck with the old request/response.

 The Producers can be converted to the common request/response without
 breaking compatibility.
 I think we should do this (even though it requires fiddling with
 additional network serialization code), just so we can throw the old
 ProduceRequest away.

 Does that make sense?


 2. I think it would be reasonable to keep all the requests under
 common,
 even though as you point out there is currently no use for some of
 them
 beyond broker-to-broker communication at the moment.

 Yep.

 3. We should think a little about how versioning will work. Making
 this
 convenient on the server side is an important goal for the new style
 of
 request definition. At the serialization level we now handle
 versioning but
 the question we should discuss and work out is how this will map to
 the
 request objects (which I assume will remain unversioned).

 The way I see it working (I just started on this, so I may have gaps):

 * Request header contains the version
 * When we read the request, we use ProtoUtils.requestSchema which
 takes version as a parameter and is responsible to give us the right
 Schema, which we use to read the buffer and get the correct struct.
 * KafkaApis handlers have the header, so they can use it to access the
 correct fields, build the correct response, etc.

 Does that sound about right?


 4. Ideally after this refactoring the network package should not be
 dependent on the individual request objects. The intention is that
 stuff in
 kafka.network is meant to be generic network infrastructure that
 doesn't
 know about the particular fetch/produce apis we have implemented on
 top.

 I'll make a note to validate that this is the case.

 
  -Jay
 
  On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
  Hi Jun,
 
  I was taking a slightly different approach. Let me know if it makes
  sense to you:
 
  1. Get the bytes from network (kinda unavoidable...)
  2. Modify RequestChannel.Request to contain header and body (instead
  of a single object)
  3. Create the head and body from bytes as follow:
  val header: RequestHeader = RequestHeader.parse(buffer)
  val apiKey: Int = 

[jira] [Commented] (KAFKA-2031) Executing scripts that invoke kafka-run-class.sh results in 'permission denied to create log dir' warning.

2015-03-18 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14368239#comment-14368239
 ] 

Sriharsha Chintalapani commented on KAFKA-2031:
---

[~mnarayan] we've a JIRA to add kafka-env.sh here 
https://issues.apache.org/jira/browse/KAFKA-1566

 Executing scripts that invoke kafka-run-class.sh results in 'permission 
 denied to create log dir' warning.
 --

 Key: KAFKA-2031
 URL: https://issues.apache.org/jira/browse/KAFKA-2031
 Project: Kafka
  Issue Type: Bug
  Components: packaging
Affects Versions: 0.8.1.1
Reporter: Manikandan Narayanaswamy
Priority: Minor

 Kafka-run-class.sh script expects LOG_DIR variable to be set, and if this 
 variable is not set, it defaults log dir to $base_dir/logs. And in the event 
 the executor of the script does not have the right permissions, it would lead 
 to errors such as:
 {noformat}
 mkdir: cannot create directory `/usr/lib/kafka/bin/../logs': Permission 
 denied.
 {noformat}
 Proposing one way to make this more configurable is by introducing 
 kafka-env.sh. Sourcing this file would export LOG_DIR and potentially other 
 variables if need be.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31742: Patch for KAFKA-527

2015-03-18 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31742/#review76985
---


Thanks for the patch.

Could we test out the patch with StressTestLog?


core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
https://reviews.apache.org/r/31742/#comment124757

Sometimes, the message size can be large. To avoid memory fragmentation, 
would it be better to cap the segment size to sth like 256K? So the segment 
size will be min(messageSetSize, 64KB)?



core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
https://reviews.apache.org/r/31742/#comment124756

Instead of passing in a function, I think it will be easier to understand 
the logic if we just pass in the message list to messageWriter.write(). This 
way, all writes are inside messageWriter, instead of in two places.



core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
https://reviews.apache.org/r/31742/#comment124758

Same comment on the segment size as the above.



core/src/main/scala/kafka/message/MessageWriter.scala
https://reviews.apache.org/r/31742/#comment124759

For consistency, size() can just be size.



core/src/main/scala/kafka/message/MessageWriter.scala
https://reviews.apache.org/r/31742/#comment124760

Would it be better to name this ReservedOutput?



core/src/main/scala/kafka/message/MessageWriter.scala
https://reviews.apache.org/r/31742/#comment124744

Perhaps we can rename off to initOffset and offset to currOffset?



core/src/main/scala/kafka/message/MessageWriter.scala
https://reviews.apache.org/r/31742/#comment124743

Shouldn't we test off  b.length and off + len  b.length?



core/src/main/scala/kafka/message/MessageWriter.scala
https://reviews.apache.org/r/31742/#comment124745

Perhaps it's clearer to write this as a do .. while? It feels a bit weird 
that the first statement is adding an amount that is initialized to 0.



core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
https://reviews.apache.org/r/31742/#comment124754

Could we just use Random.nextBytes()?



core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
https://reviews.apache.org/r/31742/#comment124755

Could we explicitly import the implicits in where it's applied? That way 
it's clear where it's used.


- Jun Rao


On March 16, 2015, 10:19 p.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31742/
 ---
 
 (Updated March 16, 2015, 10:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-527
 https://issues.apache.org/jira/browse/KAFKA-527
 
 
 Repository: kafka
 
 
 Description
 ---
 
 less byte copies
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
 9c694719dc9b515fb3c3ae96435a87b334044272 
   core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31742/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Yasuhiro Matsuda
 




Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-03-18 Thread Parth Brahmbhatt
Hi Michael,

Thanks for taking the time to review. Currently I did not plan on adding
“Deny” but I guess it can’t hurt except for adding more constructs would
probably make the acls more complex.

When a topic is created with no acls provided , I was planning to add a
default ACL which would allow access to everyone from all hosts.

I am assuming you are referring to principal in Acl and acls were supposed
to be provided in property files, stored in zk so I thought it is better
to just refer to a string. We will always be using
session.principal.getName to get the actual principal name.

Thanks
Parth

On 3/18/15, 2:20 PM, Michael Herstine mherst...@linkedin.com.INVALID
wrote:

Hi Parth,

Thanks! A few questions:

1. Do you want to permit rules in your ACLs that DENY access as well as
ALLOW? This can be handy setting up rules that have exceptions. E.g.
“Allow principal P to READ resource R from all hosts” with “Deny principal
P READ access to resource R from host H1” in combination would allow P to
READ R from all hosts *except* H1.

2. When a topic is newly created, will there be an ACL created for it? If
not, would that not deny subsequent access to it?

(nit) Maybe use Principal instead of String to represent principals?


On 3/9/15, 11:48 AM, Don Bosco Durai bo...@apache.org wrote:

Parth

Overall it is looking good. Couple of questionsŠ

- Can you give an example how the policies will look like in the default
implementation?
- In the operations, can we support ³CONNECT² also? This can be used
during Session connection
- Regarding access control for ³Topic Creation², since we can¹t do it on
the server side, can we de-scope it for? And plan it as a future feature
request?

Thanks

Bosco

 

On 3/6/15, 8:10 AM, Harsha ka...@harsha.io wrote:

Hi Parth,
Thanks for putting this together. Overall it looks good to
me. Although AdminUtils is a concern KIP-4  can probably fix
that part.
Thanks,
Harsha

On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote:
 Forgot to add links to wiki and jira.
 
 Link to wiki:
 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorizatio
n
+
Interface
 Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688
 
 Thanks
 Parth
 
 From: Parth Brahmbhatt
 pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com
 Date: Thursday, March 5, 2015 at 10:33 AM
 To: dev@kafka.apache.orgmailto:dev@kafka.apache.org
 dev@kafka.apache.orgmailto:dev@kafka.apache.org
 Subject: [DISCUSS] KIP-11- Authorization design for kafka security
 
 Hi,
 
 KIP-11 is open for discussion , I have updated the wiki with the
design
 and open questions.
 
 Thanks
 Parth






[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-03-18 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367645#comment-14367645
 ] 

Parth Brahmbhatt commented on KAFKA-1688:
-

[~junrao][~jkreps] Can you guys comment on the question about TopicConfig Cache?

 Add authorization interface and naive implementation
 

 Key: KAFKA-1688
 URL: https://issues.apache.org/jira/browse/KAFKA-1688
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Jay Kreps
Assignee: Parth Brahmbhatt
 Fix For: 0.8.3


 Add a PermissionManager interface as described here:
 https://cwiki.apache.org/confluence/display/KAFKA/Security
 (possibly there is a better name?)
 Implement calls to the PermissionsManager in KafkaApis for the main requests 
 (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
 exception to the protocol to indicate permission denied.
 Add a server configuration to give the class you want to instantiate that 
 implements that interface. That class can define its own configuration 
 properties from the main config file.
 Provide a simple implementation of this interface which just takes a user and 
 ip whitelist and permits those in either of the whitelists to do anything, 
 and denies all others.
 Rather than writing an integration test for this class we can probably just 
 use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-18 Thread Andrii Biletskyi
Gwen,

Yes, looks like KAFKA-1927 will leave TopicMetadataRequest/Response.
But I believe, KIP is still tightly related with KAFKA-1927 since we are
not only
going to update TopicMetadataRequest there but we will introduce a bunch
of new requests too. And it probably makes sense to do those correctly from
scratch - without introducing scala request objects. As I understand you'll
have this common infrastructure code done in KAFKA-1927.

Thanks,
Andrii Biletskyi

On Wed, Mar 18, 2015 at 8:38 PM, Gwen Shapira gshap...@cloudera.com wrote:

 On Wed, Mar 18, 2015 at 9:34 AM, Jun Rao j...@confluent.io wrote:
  Andri,
 
  Thanks for the summary.
 
  1. I just realized that in order to start working on KAFKA-1927, we will
  need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk.
  This is planned to be done as part of KAFKA-1634. So, we will need
 Guozhang
  and Joel's help to wrap this up.

 I mentioned this in a separate thread, but it may be more relevant here:
 It looks like the SimpleConsumer API exposes TopicMetadataRequest and
 TopicMetadataResponse.
 This means that KAFKA-1927 doesn't remove this duplication.

 So I'm not sure we actually need KAFKA-1927 before implementing this KIP.
 This doesn't mean I'm stopping work on KAFKA-1927, but perhaps it
 means we can proceed in parallel?

  2. Thinking about this a bit more, if the semantic of those write
  requests is async (i.e., after the client gets a response, it just means
  that the operation is initiated, but not necessarily completed), we don't
  really need to forward the requests to the controller. Instead, the
  receiving broker can just write the operation to ZK as the admin command
  line tool previously does. This will simplify the implementation.
 
  8. There is another implementation detail for describe topic. Ideally, we
  want to read the topic config from the broker cache, instead of
 ZooKeeper.
  Currently, every broker reads the topic-level config for all topics.
  However, it ignores those for topics not hosted on itself. So, we may
 need
  to change TopicConfigManager a bit so that it caches the configs for all
  topics.
 
  Thanks,
 
  Jun
 
 
  On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi 
  andrii.bilets...@stealth.ly wrote:
 
  Guys,
 
  Thanks for a great discussion!
  Here are the actions points:
 
  1. Q: Get rid of all scala requests objects, use java protocol
 definitions.
  A: Gwen kindly took that (KAFKA-1927). It's important to speed up
  review procedure
   there since this ticket blocks other important changes.
 
  2. Q: Generic re-reroute facility vs client maintaining cluster state.
  A: Jay has added pseudo code to KAFKA-1912 - need to consider
 whether
  this will be
  easy to implement as a server-side feature (comments are
  welcomed!).
 
  3. Q: Controller field in wire protocol.
  A: This might be useful for clients, add this to
 TopicMetadataResponse
  (already in KIP).
 
  4. Q: Decoupling topic creation from TMR.
  A: I will add proposed by Jun solution (using clientId for that) to
 the
  KIP.
 
  5. Q: Bumping new versions of TMR vs grabbing all protocol changes in
 one
  version.
  A: It was decided to try to gather all changes to protocol (before
  release).
  In case of TMR it worth checking: KAFKA-2020 and KIP-13 (quotas)
 
  6. Q: JSON lib is needed to deserialize user's input in CLI tool.
  A: Use jackson for that, /tools project is a separate jar so
 shouldn't
  be a big deal.
 
  7.  Q: VerifyReassingPartitions vs generic status check command.
   A: For long-running requests like reassign partitions *progress*
 check
  request is useful,
   it makes sense to introduce it.
 
   Please add, correct me if I missed something.
 
  Thanks,
  Andrii Biletskyi
 
  On Tue, Mar 17, 2015 at 6:20 PM, Andrii Biletskyi 
  andrii.bilets...@stealth.ly wrote:
 
   Joel,
  
   You are right, I removed ClusterMetadata because we have partially
   what we need in TopicMetadata. Also, as Jay pointed out earlier, we
   would like to have orthogonal API, but at the same time we need
   to be backward compatible.
  
   But I like your idea and even have some other arguments for this
 option:
   There is also DescribeTopicRequest which was proposed in this KIP,
   it returns topic configs, partitions, replication factor plus
 partition
   ISR, ASR,
   leader replica. The later part is really already there in
   TopicMetadataRequest.
   So again we'll have to add stuff to TMR, not to duplicate some info in
   newly added requests. However, this way we'll end up with monster
   request which returns cluster metadata, topic replication and config
 info
   plus partition replication data. Seems logical to split TMR to
   - ClusterMetadata (brokers + controller, maybe smth else)
   - TopicMetadata (topic info + partition details)
   But since current TMR is involved in lots of places (including network
   client,
   as I understand) this might be very 

Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Jun Rao
Yes, that looks reasonable. Then, in KafkaApi, when we get into the
handling of a specific request, we can construct the specific request
object from struct.

Thanks,

Jun

On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 Hi Jun,

 I was taking a slightly different approach. Let me know if it makes
 sense to you:

 1. Get the bytes from network (kinda unavoidable...)
 2. Modify RequestChannel.Request to contain header and body (instead
 of a single object)
 3. Create the head and body from bytes as follow:
 val header: RequestHeader = RequestHeader.parse(buffer)
 val apiKey: Int = header.apiKey
 val body: Struct =
 ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
 4. KafkaAPIs will continue getting RequestChannel.Request, but will
 now have access to body and header separately.

 I agree that I need a Request/Response objects that contain only the
 body for all requests objects.
 I'm thinking of implementing them in o.a.k.Common.Requests in Java for
 consistency.

 When we are discussing the requests/responses used in SimpleConsumer,
 we mean everything used in javaapi, right?

 Gwen



 On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote:
  Hi, Gwen,
 
  I was thinking that we will be doing the following in KAFKA-1927.
 
  1. Get the bytes from network.
  2. Use a new generic approach to convert bytes into request objects.
  2.1 Read the fixed request header (using the util in client).
  2.2 Based on the request id in the header, deserialize the rest of the
  bytes into a request specific object (using the new java objects).
  3. We will then be passing a header and an AbstractRequestResponse to
  KafkaApis.
 
  In order to do that, we will need to create similar request/response
  objects for internal requests such as StopReplica, LeaderAndIsr,
  UpdateMetadata, ControlledShutdown. Not sure whether they should be
 written
  in java or scala, but perhaps they should be only in the core project.
 
  Also note, there are some scala requests/responses used directly in
  SimpleConsumer. Since that's our public api, we can't remove those scala
  objects until the old consumer is phased out. We can remove the rest of
 the
  scala request objects.
 
  Thanks,
 
  Jun
 
 
  On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  Hi,
 
  I'm starting this thread for the various questions I run into while
  refactoring the server to use client requests and responses.
 
  Help is appreciated :)
 
  First question: LEADER_AND_ISR request and STOP_REPLICA request are
  unimplemented in the client.
 
  Do we want to implement them as part of this refactoring?
  Or should we continue using the scala implementation for those?
 
  Gwen
 



Re: Review Request 31568: Patch for KAFKA-1989

2015-03-18 Thread Yasuhiro Matsuda


 On March 18, 2015, 4:14 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/server/DelayedOperation.scala, line 373
  https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line373
 
  Try purging on each watcher list in each doWork will likely increase 
  the CPU a lot, we can reduce the CPU usage by only purging when watched() 
  is too large.

A list scan happens only when the unreachable count is greater than 100. The 
unreachable count is incremented by polling the reference queue. So, it is 
driven by GC cycles in some sense. If there is nothing to purge, the scan won't 
happen. And it is set to zero when tryCompleteWatched is called. So, I expect 
that the unreachable count is low most of the time.

There is a problem in using watched(). It is the count of watched requests. On 
a busy server, watched() may exceed the threashold more often even when there 
is not many entries to purge.


- Yasuhiro


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76899
---


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31568/
 ---
 
 (Updated Feb. 28, 2015, 12:14 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1989
 https://issues.apache.org/jira/browse/KAFKA-1989
 
 
 Repository: kafka
 
 
 Description
 ---
 
 new purgatory implementation
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/DelayedOperation.scala 
 e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
 7a37617395b9e4226853913b8989f42e7301de7c 
   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31568/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Yasuhiro Matsuda
 




Re: [DISCUSS] KIP-5 - Broker Configuration Management

2015-03-18 Thread Andrii Biletskyi
Aditya,

Yes, you are right. We agreed to think how global config can
be done w/o restarting the broker, consumers/producers. I haven't
yet updated the KIP accordingly though.
In short, the problem is that KafkaConfig is spread out over different
components, like SocketServer, OffsetManager etc. This makes
it hard to dynamically change config, at least if we follow dynamic
LogConfig approach. So my opinion and vision is that we need to
treat each config block (like quotas in your case) separately. Logically,
different cases will be handled differently, some of the configs are
not eligible for dynamic change at all.

Thanks,
Andrii Biletskyi

On Wed, Mar 18, 2015 at 6:54 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 Hi Andrii,

 Thanks for the writeup.
 IMO, we should be able to support dynamically changing certain configs.
 For example: the Quota proposal relies on such a mechanism for changing
 quotas on the fly.

 Aditya
 
 From: Andrii Biletskyi [andrii.bilets...@stealth.ly]
 Sent: Tuesday, March 03, 2015 10:30 AM
 To: dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP-5 - Broker Configuration Management

 Hey Jun,

 Initially I was thinking about instead of 3-4 state that global config
 changes
 take effect only after broker restart. So it's just:

 3-4. On each broker startup apply global config from ZK

 In other words, the comprehensive workflow is the following:
 1. Issue ChangeGlobalConfigRequest
 2. Controller validates / stores config in ZK
 (out of scope of this KIP) 3. Do rolling restart for brokers one by one to
 pick up
 config changes

 My understanding is that we won't be able to handle config change
 dynamically
 as we do for Log config. The main reason, from my point of view, is that
 broker config operates such settings as num.io.threads updating which would
 mean
 gracefully restart some of the broker's components (in this case
 SocketServer) which is,
 in turn, might be very tricky.

 Thanks,
 Andrii Biletskyi

 On Tue, Mar 3, 2015 at 7:43 PM, Jun Rao j...@confluent.io wrote:

  It seems the proposed workflow is the following.
 
  1. Client issues a global config update request to the broker.
  2. Broker writes the new config to ZK.
  3. Controller picks up the changes from ZK.
  4. Controller propagates the config changes to all brokers.
 
  Do we need to add a new request/response to propagate the config changes?
 
  Also, this logic is a bit different from how per topic config changes
  works: each broker reads from ZK directly. It would be good to make them
  consistent.
 
  Thanks,
 
  Jun
 
 
  On Wed, Jan 21, 2015 at 10:34 PM, Joe Stein joe.st...@stealth.ly
 wrote:
 
   Created a KIP
  
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-5+-+Broker+Configuration+Management
  
   JIRA https://issues.apache.org/jira/browse/KAFKA-1786
  
   /***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
   /
  
 



Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Gwen Shapira
Looking at the SimpleConsumer, we need to leave in place:

ConsumerMetadataResponse
FetchRequest / FetchResponse
OffsetFetchRequest / OffsetFetchResponse
OffsetCommitRequest / OffsetCommitResponse
OffsetRequest / OffsetResponse
TopicMetadata
TopicMetadataRequest / TopicMetadataResponse

Specifically, TopicMetadata request and response will remain
duplicated after KAFKA-1927.
So I am no longer sure why this is a blocker for KIP-4.

Gwen



On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com wrote:
 Hi Jun,

 I was taking a slightly different approach. Let me know if it makes
 sense to you:

 1. Get the bytes from network (kinda unavoidable...)
 2. Modify RequestChannel.Request to contain header and body (instead
 of a single object)
 3. Create the head and body from bytes as follow:
 val header: RequestHeader = RequestHeader.parse(buffer)
 val apiKey: Int = header.apiKey
 val body: Struct =
 ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
 4. KafkaAPIs will continue getting RequestChannel.Request, but will
 now have access to body and header separately.

 I agree that I need a Request/Response objects that contain only the
 body for all requests objects.
 I'm thinking of implementing them in o.a.k.Common.Requests in Java for
 consistency.

 When we are discussing the requests/responses used in SimpleConsumer,
 we mean everything used in javaapi, right?

 Gwen



 On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote:
 Hi, Gwen,

 I was thinking that we will be doing the following in KAFKA-1927.

 1. Get the bytes from network.
 2. Use a new generic approach to convert bytes into request objects.
 2.1 Read the fixed request header (using the util in client).
 2.2 Based on the request id in the header, deserialize the rest of the
 bytes into a request specific object (using the new java objects).
 3. We will then be passing a header and an AbstractRequestResponse to
 KafkaApis.

 In order to do that, we will need to create similar request/response
 objects for internal requests such as StopReplica, LeaderAndIsr,
 UpdateMetadata, ControlledShutdown. Not sure whether they should be written
 in java or scala, but perhaps they should be only in the core project.

 Also note, there are some scala requests/responses used directly in
 SimpleConsumer. Since that's our public api, we can't remove those scala
 objects until the old consumer is phased out. We can remove the rest of the
 scala request objects.

 Thanks,

 Jun


 On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi,

 I'm starting this thread for the various questions I run into while
 refactoring the server to use client requests and responses.

 Help is appreciated :)

 First question: LEADER_AND_ISR request and STOP_REPLICA request are
 unimplemented in the client.

 Do we want to implement them as part of this refactoring?
 Or should we continue using the scala implementation for those?

 Gwen



Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-18 Thread Joel Koshy
(Thanks Andrii for the summary)

For (1) yes we will circle back on that shortly after syncing up in
person. I think it is close to getting committed although development
for KAFKA-1927 can probably begin without it.

There is one more item we covered at the hangout. i.e., whether we
want to add the coordinator to the topic metadata response or provide
a clearer ClusterMetadataRequest.

There are two reasons I think we should try and avoid adding the
field:
- It is irrelevant to topic metadata
- If we finally do request rerouting in Kafka then the field would add
  little to no value. (It still helps to have a separate
  ClusterMetadataRequest to query for cluster-wide information such as
  'which broker is the controller?' as Joe mentioned.)

I think it would be cleaner to have an explicit ClusterMetadataRequest
that you can send to any broker in order to obtain the controller (and
in the future possibly other cluster-wide information). I think the
main argument against doing this and instead adding it to the topic
metadata response was convenience - i.e., you don't have to discover
the controller in advance. However, I don't see much actual
benefit/convenience in this and in fact think it is a non-issue. Let
me know if I'm overlooking something here.

As an example, say we need to initiate partition reassignment by
issuing the new ReassignPartitionsRequest to the controller (assume we
already have the desired manual partition assignment).  If we are to
augment topic metadata response then the flow be something like this :

- Issue topic metadata request to any broker (and discover the
  controller
- Connect to controller if required (i.e., if the broker above !=
  controller)
- Issue the partition reassignment request to the controller.

With an explicit cluster metadata request it would be:
- Issue cluster metadata request to any broker
- Connect to controller if required (i.e., if the broker above !=
  controller)
- Issue the partition reassignment request

So it seems to add little practical value and bloats topic metadata
response with an irrelevant detail.

The other angle to this is the following - is it a matter of naming?
Should we just rename topic metadata request/response to just
MetadataRequest/Response and add cluster metadata to it? By that same
token should we also allow querying for the consumer coordinator (and
in future transaction coordinator) as well? This leads to a bloated
request which isn't very appealing and altogether confusing.

Thanks,

Joel

On Wed, Mar 18, 2015 at 09:34:12AM -0700, Jun Rao wrote:
 Andri,
 
 Thanks for the summary.
 
 1. I just realized that in order to start working on KAFKA-1927, we will
 need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk.
 This is planned to be done as part of KAFKA-1634. So, we will need Guozhang
 and Joel's help to wrap this up.
 
 2. Thinking about this a bit more, if the semantic of those write
 requests is async (i.e., after the client gets a response, it just means
 that the operation is initiated, but not necessarily completed), we don't
 really need to forward the requests to the controller. Instead, the
 receiving broker can just write the operation to ZK as the admin command
 line tool previously does. This will simplify the implementation.
 
 8. There is another implementation detail for describe topic. Ideally, we
 want to read the topic config from the broker cache, instead of ZooKeeper.
 Currently, every broker reads the topic-level config for all topics.
 However, it ignores those for topics not hosted on itself. So, we may need
 to change TopicConfigManager a bit so that it caches the configs for all
 topics.
 
 Thanks,
 
 Jun
 
 
 On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi 
 andrii.bilets...@stealth.ly wrote:
 
  Guys,
 
  Thanks for a great discussion!
  Here are the actions points:
 
  1. Q: Get rid of all scala requests objects, use java protocol definitions.
  A: Gwen kindly took that (KAFKA-1927). It's important to speed up
  review procedure
   there since this ticket blocks other important changes.
 
  2. Q: Generic re-reroute facility vs client maintaining cluster state.
  A: Jay has added pseudo code to KAFKA-1912 - need to consider whether
  this will be
  easy to implement as a server-side feature (comments are
  welcomed!).
 
  3. Q: Controller field in wire protocol.
  A: This might be useful for clients, add this to TopicMetadataResponse
  (already in KIP).
 
  4. Q: Decoupling topic creation from TMR.
  A: I will add proposed by Jun solution (using clientId for that) to the
  KIP.
 
  5. Q: Bumping new versions of TMR vs grabbing all protocol changes in one
  version.
  A: It was decided to try to gather all changes to protocol (before
  release).
  In case of TMR it worth checking: KAFKA-2020 and KIP-13 (quotas)
 
  6. Q: JSON lib is needed to deserialize user's input in CLI tool.
  A: Use jackson for that, /tools project is a 

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-18 Thread Gwen Shapira
Got it. Thanks for clarifying!

On Wed, Mar 18, 2015 at 11:54 AM, Andrii Biletskyi
andrii.bilets...@stealth.ly wrote:
 Gwen,

 Yes, looks like KAFKA-1927 will leave TopicMetadataRequest/Response.
 But I believe, KIP is still tightly related with KAFKA-1927 since we are
 not only
 going to update TopicMetadataRequest there but we will introduce a bunch
 of new requests too. And it probably makes sense to do those correctly from
 scratch - without introducing scala request objects. As I understand you'll
 have this common infrastructure code done in KAFKA-1927.

 Thanks,
 Andrii Biletskyi

 On Wed, Mar 18, 2015 at 8:38 PM, Gwen Shapira gshap...@cloudera.com wrote:

 On Wed, Mar 18, 2015 at 9:34 AM, Jun Rao j...@confluent.io wrote:
  Andri,
 
  Thanks for the summary.
 
  1. I just realized that in order to start working on KAFKA-1927, we will
  need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk.
  This is planned to be done as part of KAFKA-1634. So, we will need
 Guozhang
  and Joel's help to wrap this up.

 I mentioned this in a separate thread, but it may be more relevant here:
 It looks like the SimpleConsumer API exposes TopicMetadataRequest and
 TopicMetadataResponse.
 This means that KAFKA-1927 doesn't remove this duplication.

 So I'm not sure we actually need KAFKA-1927 before implementing this KIP.
 This doesn't mean I'm stopping work on KAFKA-1927, but perhaps it
 means we can proceed in parallel?

  2. Thinking about this a bit more, if the semantic of those write
  requests is async (i.e., after the client gets a response, it just means
  that the operation is initiated, but not necessarily completed), we don't
  really need to forward the requests to the controller. Instead, the
  receiving broker can just write the operation to ZK as the admin command
  line tool previously does. This will simplify the implementation.
 
  8. There is another implementation detail for describe topic. Ideally, we
  want to read the topic config from the broker cache, instead of
 ZooKeeper.
  Currently, every broker reads the topic-level config for all topics.
  However, it ignores those for topics not hosted on itself. So, we may
 need
  to change TopicConfigManager a bit so that it caches the configs for all
  topics.
 
  Thanks,
 
  Jun
 
 
  On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi 
  andrii.bilets...@stealth.ly wrote:
 
  Guys,
 
  Thanks for a great discussion!
  Here are the actions points:
 
  1. Q: Get rid of all scala requests objects, use java protocol
 definitions.
  A: Gwen kindly took that (KAFKA-1927). It's important to speed up
  review procedure
   there since this ticket blocks other important changes.
 
  2. Q: Generic re-reroute facility vs client maintaining cluster state.
  A: Jay has added pseudo code to KAFKA-1912 - need to consider
 whether
  this will be
  easy to implement as a server-side feature (comments are
  welcomed!).
 
  3. Q: Controller field in wire protocol.
  A: This might be useful for clients, add this to
 TopicMetadataResponse
  (already in KIP).
 
  4. Q: Decoupling topic creation from TMR.
  A: I will add proposed by Jun solution (using clientId for that) to
 the
  KIP.
 
  5. Q: Bumping new versions of TMR vs grabbing all protocol changes in
 one
  version.
  A: It was decided to try to gather all changes to protocol (before
  release).
  In case of TMR it worth checking: KAFKA-2020 and KIP-13 (quotas)
 
  6. Q: JSON lib is needed to deserialize user's input in CLI tool.
  A: Use jackson for that, /tools project is a separate jar so
 shouldn't
  be a big deal.
 
  7.  Q: VerifyReassingPartitions vs generic status check command.
   A: For long-running requests like reassign partitions *progress*
 check
  request is useful,
   it makes sense to introduce it.
 
   Please add, correct me if I missed something.
 
  Thanks,
  Andrii Biletskyi
 
  On Tue, Mar 17, 2015 at 6:20 PM, Andrii Biletskyi 
  andrii.bilets...@stealth.ly wrote:
 
   Joel,
  
   You are right, I removed ClusterMetadata because we have partially
   what we need in TopicMetadata. Also, as Jay pointed out earlier, we
   would like to have orthogonal API, but at the same time we need
   to be backward compatible.
  
   But I like your idea and even have some other arguments for this
 option:
   There is also DescribeTopicRequest which was proposed in this KIP,
   it returns topic configs, partitions, replication factor plus
 partition
   ISR, ASR,
   leader replica. The later part is really already there in
   TopicMetadataRequest.
   So again we'll have to add stuff to TMR, not to duplicate some info in
   newly added requests. However, this way we'll end up with monster
   request which returns cluster metadata, topic replication and config
 info
   plus partition replication data. Seems logical to split TMR to
   - ClusterMetadata (brokers + controller, maybe smth else)
   - TopicMetadata (topic info + partition 

Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Gwen Shapira
OK, Andrii clarified that we need KAFKA-1927 before KIP-4 for the
infrastructure for using the common request/response classes in core.

Jun, when you got a moment, please confirm if the approach I'm taking
is acceptable, or if you see issues that I'm missing.



On Wed, Mar 18, 2015 at 11:33 AM, Gwen Shapira gshap...@cloudera.com wrote:
 Looking at the SimpleConsumer, we need to leave in place:

 ConsumerMetadataResponse
 FetchRequest / FetchResponse
 OffsetFetchRequest / OffsetFetchResponse
 OffsetCommitRequest / OffsetCommitResponse
 OffsetRequest / OffsetResponse
 TopicMetadata
 TopicMetadataRequest / TopicMetadataResponse

 Specifically, TopicMetadata request and response will remain
 duplicated after KAFKA-1927.
 So I am no longer sure why this is a blocker for KIP-4.

 Gwen



 On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com wrote:
 Hi Jun,

 I was taking a slightly different approach. Let me know if it makes
 sense to you:

 1. Get the bytes from network (kinda unavoidable...)
 2. Modify RequestChannel.Request to contain header and body (instead
 of a single object)
 3. Create the head and body from bytes as follow:
 val header: RequestHeader = RequestHeader.parse(buffer)
 val apiKey: Int = header.apiKey
 val body: Struct =
 ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
 4. KafkaAPIs will continue getting RequestChannel.Request, but will
 now have access to body and header separately.

 I agree that I need a Request/Response objects that contain only the
 body for all requests objects.
 I'm thinking of implementing them in o.a.k.Common.Requests in Java for
 consistency.

 When we are discussing the requests/responses used in SimpleConsumer,
 we mean everything used in javaapi, right?

 Gwen



 On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote:
 Hi, Gwen,

 I was thinking that we will be doing the following in KAFKA-1927.

 1. Get the bytes from network.
 2. Use a new generic approach to convert bytes into request objects.
 2.1 Read the fixed request header (using the util in client).
 2.2 Based on the request id in the header, deserialize the rest of the
 bytes into a request specific object (using the new java objects).
 3. We will then be passing a header and an AbstractRequestResponse to
 KafkaApis.

 In order to do that, we will need to create similar request/response
 objects for internal requests such as StopReplica, LeaderAndIsr,
 UpdateMetadata, ControlledShutdown. Not sure whether they should be written
 in java or scala, but perhaps they should be only in the core project.

 Also note, there are some scala requests/responses used directly in
 SimpleConsumer. Since that's our public api, we can't remove those scala
 objects until the old consumer is phased out. We can remove the rest of the
 scala request objects.

 Thanks,

 Jun


 On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi,

 I'm starting this thread for the various questions I run into while
 refactoring the server to use client requests and responses.

 Help is appreciated :)

 First question: LEADER_AND_ISR request and STOP_REPLICA request are
 unimplemented in the client.

 Do we want to implement them as part of this refactoring?
 Or should we continue using the scala implementation for those?

 Gwen



Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Gwen Shapira
See inline responses:

On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps jay.kr...@gmail.com wrote:
 Hey Gwen,

 This makes sense to me.

 A couple of thoughts, mostly confirming what you said I think:

1. Ideally we would move completely over to the new style of request
definition for server-side processing, even for the internal requests. This
way all requests would have the same header/body struct stuff. As you say
for the internal requests we can just delete the scala code. For the old
clients they will continue to use their old request definitions until we
eol them. I would propose that new changes will go only into the new
request/response objects and the old scala ones will be permanently stuck
on their current version until discontinued. So after this change that old
scala code could be considered frozen.

SimpleConsumer is obviously stuck with the old request/response.

The Producers can be converted to the common request/response without
breaking compatibility.
I think we should do this (even though it requires fiddling with
additional network serialization code), just so we can throw the old
ProduceRequest away.

Does that make sense?


2. I think it would be reasonable to keep all the requests under common,
even though as you point out there is currently no use for some of them
beyond broker-to-broker communication at the moment.

Yep.

3. We should think a little about how versioning will work. Making this
convenient on the server side is an important goal for the new style of
request definition. At the serialization level we now handle versioning but
the question we should discuss and work out is how this will map to the
request objects (which I assume will remain unversioned).

The way I see it working (I just started on this, so I may have gaps):

* Request header contains the version
* When we read the request, we use ProtoUtils.requestSchema which
takes version as a parameter and is responsible to give us the right
Schema, which we use to read the buffer and get the correct struct.
* KafkaApis handlers have the header, so they can use it to access the
correct fields, build the correct response, etc.

Does that sound about right?


4. Ideally after this refactoring the network package should not be
dependent on the individual request objects. The intention is that stuff in
kafka.network is meant to be generic network infrastructure that doesn't
know about the particular fetch/produce apis we have implemented on top.

I'll make a note to validate that this is the case.


 -Jay

 On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com
 wrote:

 Hi Jun,

 I was taking a slightly different approach. Let me know if it makes
 sense to you:

 1. Get the bytes from network (kinda unavoidable...)
 2. Modify RequestChannel.Request to contain header and body (instead
 of a single object)
 3. Create the head and body from bytes as follow:
 val header: RequestHeader = RequestHeader.parse(buffer)
 val apiKey: Int = header.apiKey
 val body: Struct =
 ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
 4. KafkaAPIs will continue getting RequestChannel.Request, but will
 now have access to body and header separately.

 I agree that I need a Request/Response objects that contain only the
 body for all requests objects.
 I'm thinking of implementing them in o.a.k.Common.Requests in Java for
 consistency.

 When we are discussing the requests/responses used in SimpleConsumer,
 we mean everything used in javaapi, right?

 Gwen



 On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote:
  Hi, Gwen,
 
  I was thinking that we will be doing the following in KAFKA-1927.
 
  1. Get the bytes from network.
  2. Use a new generic approach to convert bytes into request objects.
  2.1 Read the fixed request header (using the util in client).
  2.2 Based on the request id in the header, deserialize the rest of the
  bytes into a request specific object (using the new java objects).
  3. We will then be passing a header and an AbstractRequestResponse to
  KafkaApis.
 
  In order to do that, we will need to create similar request/response
  objects for internal requests such as StopReplica, LeaderAndIsr,
  UpdateMetadata, ControlledShutdown. Not sure whether they should be
 written
  in java or scala, but perhaps they should be only in the core project.
 
  Also note, there are some scala requests/responses used directly in
  SimpleConsumer. Since that's our public api, we can't remove those scala
  objects until the old consumer is phased out. We can remove the rest of
 the
  scala request objects.
 
  Thanks,
 
  Jun
 
 
  On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  Hi,
 
  I'm starting this thread for the various questions I run into while
  refactoring the server to use client requests and responses.
 
  Help is appreciated :)
 
  First question: 

Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-18 Thread Jay Kreps
Personally I'm in favor of (1) just to reduce the number of different APIs.
People will find the difference between abort and close subtle and
confusing and the only instance where you want it is this somewhat unusual
case you guys are pursuing, right?

-Jay

On Wed, Mar 18, 2015 at 2:13 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 It looks we have another option and are now deciding between the following
 two interfaces:

 1. Close() + close(timeout)
   - timeout could be either positive or zero.
   - only close(0) can be called from sender thread

 2. Close() + abort() + close(timeout)
   - timeout can either be positive or zero
   - only abort() can be called from sender thread

   - abort() is equivalent to close(0) in 1) but does not join sender
 thread and does not close metrics.
   - Another thread has to call close() or close(timeout) in order to make
 sure the resources in producer are gone.

 The tow approach provides the same function we need, the difference is
 approach 2) follows convention of close() and abort(). On the other hand,
 approach 1) saves one interface compared with approach 2) but does not
 follow the convention.

 When the two approaches come to user code, it is probably something like
 this:

 Try {
   While(!finished)
 Producer.send(record, callback)
 } catch (Exception e) {
   Producer.close(5)
 }

 Class CallbackImpl implements Callback {
   onCompletion(RecordMetadata metadata Exception e) {
 If (e != null)
   Abort() / close()
   }
 }

 Because the two approach leads to almost the same user code, assuming
 users are always calling producer.close() as a clean up step, personally I
 prefer approach 2) as it follows convention.

 Any thoughts?

 Jiangjie (Becket) Qin


 On 3/17/15, 10:25 AM, Jiangjie Qin j...@linkedin.com wrote:

 Hi Jun,
 
 Yes, as Guozhang said, the main reason we set a flag is because close(0)
 is expected to be called by sender thread itself.
 If we want to maintain the semantic meaning of close(), one alternative is
 to have an abort() method does the same thing as close(0) except cleanup.
 And in close(timeout), after timeout we call abort() and join the sender
 thread. This was one of the previous proposal. We merged abort to close(0)
 because they are almost doing the same thing. But from what you mentioned,
 it might make sense to have two separate methods.
 
 Thanks.
 
 Jiangjie (Becket) Qin
 
 On 3/16/15, 10:31 PM, Guozhang Wang wangg...@gmail.com wrote:
 
 Yeah in this sense the sender thread will not exist immediately in the
 close(0) call, but will only terminate after the current response batch
 has
 been processed, as will the producer instance itself.
 
 There is a reason for this though: for a clean shutdown the caller thread
 has to wait for the sender thread to join before closing the producer
 instance, but this cannot be achieve if close(0) is called by the sender
 thread itself (for example in KAFKA-1659, there is a proposal from Andrew
 Stein on using thread.interrupt and thread.stop, but if it is called by
 the
 ioThread itself the stop call will fail). Hence we came up with the flag
 approach to let the sender thread to close as soon as it is at the
 barrier
 of the run loop.
 
 Guozhang
 
 On Mon, Mar 16, 2015 at 9:41 PM, Jun Rao j...@confluent.io wrote:
 
  Hmm, does that mean that after close(0), the sender thread is not
 necessary
  gone? Normally, after closing an entity, we expect all internal threads
  associated with the entity are shut down completely.
 
  Thanks,
 
  Jun
 
  On Mon, Mar 16, 2015 at 3:18 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  wrote:
 
   Hi Jun,
  
   Close(0) will set two flags in sender. Running=false and a newly
 added
   forceClose=true. It will also set accumulator.closed=true so no
 further
   producer.send() will succeed.
   The sender thread will finish executing all the callbacks in current
  batch
   of responses, then it will see the forceClose flag. It will just fail
 all
   the incomplete batches in the producer and exit.
   So close(0) is a non-blocking call and sender thread will not try to
 join
   itself in close(0).
  
   Thanks.
  
   Jiangjie (Becket) Qin
  
   On 3/16/15, 2:50 PM, Jun Rao j...@confluent.io wrote:
  
   How does close(0) work if it's called from the sender thread? If
  close(0)
   needs to wait for the sender thread to join, wouldn't this cause a
   deadlock?
   
   Thanks,
   
   Jun
   
   On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin
 j...@linkedin.com.invalid
  
   wrote:
   
Thanks Guozhang. It wouldn’t be as thoroughly considered without
discussing with you :)
   
Jiangjie (Becket) Qin
   
On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote:
   
Thanks Jiangjie,

After talking to you offline on this, I have been convinced and
   changed my
preference to blocking. The immediate shutdown approach does have
  some
unsafeness in some cases.

Guozhang

On Mon, Mar 16, 2015 at 11:50 AM, 

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-18 Thread Jun Rao
Joel, Andril,

I think we agreed that those admin requests can be issued to any broker.
Because of that, there doesn't seem to be a strong need to know the
controller. So, perhaps we can proceed by not making any change to the
format of TMR right now. When we start using create topic request in the
producer, we will need a new version of TMR that doesn't trigger auto topic
creation. But that can be done later.

As a first cut implementation, I think the broker can just write to ZK
directly for
createToipic/alterTopic/reassignPartitions/preferredLeaderElection
requests, instead of forwarding them to the controller. This will simplify
the implementation on the broker side.

Thanks,

Jun


On Wed, Mar 18, 2015 at 11:58 AM, Joel Koshy jjkosh...@gmail.com wrote:

 (Thanks Andrii for the summary)

 For (1) yes we will circle back on that shortly after syncing up in
 person. I think it is close to getting committed although development
 for KAFKA-1927 can probably begin without it.

 There is one more item we covered at the hangout. i.e., whether we
 want to add the coordinator to the topic metadata response or provide
 a clearer ClusterMetadataRequest.

 There are two reasons I think we should try and avoid adding the
 field:
 - It is irrelevant to topic metadata
 - If we finally do request rerouting in Kafka then the field would add
   little to no value. (It still helps to have a separate
   ClusterMetadataRequest to query for cluster-wide information such as
   'which broker is the controller?' as Joe mentioned.)

 I think it would be cleaner to have an explicit ClusterMetadataRequest
 that you can send to any broker in order to obtain the controller (and
 in the future possibly other cluster-wide information). I think the
 main argument against doing this and instead adding it to the topic
 metadata response was convenience - i.e., you don't have to discover
 the controller in advance. However, I don't see much actual
 benefit/convenience in this and in fact think it is a non-issue. Let
 me know if I'm overlooking something here.

 As an example, say we need to initiate partition reassignment by
 issuing the new ReassignPartitionsRequest to the controller (assume we
 already have the desired manual partition assignment).  If we are to
 augment topic metadata response then the flow be something like this :

 - Issue topic metadata request to any broker (and discover the
   controller
 - Connect to controller if required (i.e., if the broker above !=
   controller)
 - Issue the partition reassignment request to the controller.

 With an explicit cluster metadata request it would be:
 - Issue cluster metadata request to any broker
 - Connect to controller if required (i.e., if the broker above !=
   controller)
 - Issue the partition reassignment request

 So it seems to add little practical value and bloats topic metadata
 response with an irrelevant detail.

 The other angle to this is the following - is it a matter of naming?
 Should we just rename topic metadata request/response to just
 MetadataRequest/Response and add cluster metadata to it? By that same
 token should we also allow querying for the consumer coordinator (and
 in future transaction coordinator) as well? This leads to a bloated
 request which isn't very appealing and altogether confusing.

 Thanks,

 Joel

 On Wed, Mar 18, 2015 at 09:34:12AM -0700, Jun Rao wrote:
  Andri,
 
  Thanks for the summary.
 
  1. I just realized that in order to start working on KAFKA-1927, we will
  need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk.
  This is planned to be done as part of KAFKA-1634. So, we will need
 Guozhang
  and Joel's help to wrap this up.
 
  2. Thinking about this a bit more, if the semantic of those write
  requests is async (i.e., after the client gets a response, it just means
  that the operation is initiated, but not necessarily completed), we don't
  really need to forward the requests to the controller. Instead, the
  receiving broker can just write the operation to ZK as the admin command
  line tool previously does. This will simplify the implementation.
 
  8. There is another implementation detail for describe topic. Ideally, we
  want to read the topic config from the broker cache, instead of
 ZooKeeper.
  Currently, every broker reads the topic-level config for all topics.
  However, it ignores those for topics not hosted on itself. So, we may
 need
  to change TopicConfigManager a bit so that it caches the configs for all
  topics.
 
  Thanks,
 
  Jun
 
 
  On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi 
  andrii.bilets...@stealth.ly wrote:
 
   Guys,
  
   Thanks for a great discussion!
   Here are the actions points:
  
   1. Q: Get rid of all scala requests objects, use java protocol
 definitions.
   A: Gwen kindly took that (KAFKA-1927). It's important to speed up
   review procedure
there since this ticket blocks other important changes.
  
   2. Q: Generic re-reroute facility vs 

Re: Review Request 31568: Patch for KAFKA-1989

2015-03-18 Thread Guozhang Wang


 On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/server/DelayedOperation.scala, lines 187-192
  https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line187
 
  TBD
 
 Guozhang Wang wrote:
 Replace the TBD here: we can let Timer.addTimerTaskEntry and Timer.add 
 return Boolean instead of Unit indicating if the task has not expired and 
 successfully added to the timer. And then we can change above as
 
 if (!operation.isComplete()) {
   if (!timeoutTimer.add(operation) {
 operation.cancel()
   }
 }
 
 Yasuhiro Matsuda wrote:
 An expired task is always successfuly added to the timer and executed 
 immediately. Do you mean completed instead of expired? TimeTask do not 
 have notion of completion. I kept the completion concept out of the Timer 
 implementation since it is not essential to Timer functionality.

That makes sense. I guess I was just trying to avoid calling 
operation.isComplete consecutively, which may actually not be a bad thing.


 On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/utils/timer/Timer.scala, line 68
  https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68
 
  I think bucket.flush(reinsurt) will always fail on all the items since 
  their expiration time will always  bucket expiration + ticketMs, i.e. the 
  returned bucket from the delayed queue has already expired all its items. 
  In this case, could we just call foreach(submit) on all of them instead of 
  trying to reinsurt them?
 
 Yasuhiro Matsuda wrote:
 It is true only for the lowest wheel. Reinsert is necessary to make 
 timing wheels work. A bucket from a higher wheel may contain tasks not 
 expired (a tick time is longer in a higher wheel).
 
 Guozhang Wang wrote:
 OK, I may miss sth. here, but this is my reasoning:
 
 The bucket is only returned from delayed queue in line 62 if its 
 expiration time has passed currentTime, after that at least the lowest wheel 
 will advance to its expiration time, and hence add call within the reinsert 
 is doomed to fail as task.expirationTime  wheel's time + tickMs.
 
 Yasuhiro Matsuda wrote:
 If the expired bucket is from the lowest wheel, all tasks in the bucket 
 is expired. reinsert submits the task to a thread pool for execution.
 If the expired bucket is from a higher wheel, tasks are either expired or 
 not expired. reinsert submits the expired tasks to a thread pool and move 
 unexpired tasks to lower wheels.

Got it.


 On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72
  https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72
 
  It seems the task entry of the task will only be set once throughout 
  its life time; even when the task entry gets reinsurted its correspondence 
  to the task will not change, right?
  
  If that is true we can just set the entry for the task in the 
  constructor of the task entry.
 
 Yasuhiro Matsuda wrote:
 This sets TimerTaskEntry to TimerTask. TimeTask is created independently 
 from a Timer, then enqueued to a Timer.
 
 Guozhang Wang wrote:
 Yes, but can we move this line to line 119 of TimerTaskList.scala? Then 
 in line 46 of Timer when we create the TimerTaskEntry with the passed in 
 TimerTask its entry field will be set automatically.
 
 Yasuhiro Matsuda wrote:
 If a task already enqueued to a timer is enqueued again intentionally or 
 unintentionally (=bug), what happens?
 My intention here is to keep data structure consistent in such a case. 
 setTimerTaskEntry removes the old entry if exists.

This is true, I was originally confused about whether we ever need to 
re-enqueue, but the previous comment made it clear to me now.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
---


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31568/
 ---
 
 (Updated Feb. 28, 2015, 12:14 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1989
 https://issues.apache.org/jira/browse/KAFKA-1989
 
 
 Repository: kafka
 
 
 Description
 ---
 
 new purgatory implementation
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/DelayedOperation.scala 
 e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
   

Re: Review Request 32193: Patch for KAFKA-1997

2015-03-18 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32193/#review76955
---

Ship it!


Ship It!

- Guozhang Wang


On March 18, 2015, 7:47 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32193/
 ---
 
 (Updated March 18, 2015, 7:47 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1997
 https://issues.apache.org/jira/browse/KAFKA-1997
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Replaced producer config properties string with macros
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/MirrorMaker.scala 
 11acc3103e4e4a30d7380e26355ccba09b3304bb 
 
 Diff: https://reviews.apache.org/r/32193/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-18 Thread Andrii Biletskyi
Jun,

I might be wrong but didn't we agree we will let any broker from the
cluster handle *long-running* admin requests (at this time preferredReplica
and
reassignPartitions), via zk admin path. Thus CreateTopics etc should be
sent
only to the controller.

Thanks,
Andrii Biletskyi

On Wed, Mar 18, 2015 at 11:55 PM, Jun Rao j...@confluent.io wrote:

 Joel, Andril,

 I think we agreed that those admin requests can be issued to any broker.
 Because of that, there doesn't seem to be a strong need to know the
 controller. So, perhaps we can proceed by not making any change to the
 format of TMR right now. When we start using create topic request in the
 producer, we will need a new version of TMR that doesn't trigger auto topic
 creation. But that can be done later.

 As a first cut implementation, I think the broker can just write to ZK
 directly for
 createToipic/alterTopic/reassignPartitions/preferredLeaderElection
 requests, instead of forwarding them to the controller. This will simplify
 the implementation on the broker side.

 Thanks,

 Jun


 On Wed, Mar 18, 2015 at 11:58 AM, Joel Koshy jjkosh...@gmail.com wrote:

  (Thanks Andrii for the summary)
 
  For (1) yes we will circle back on that shortly after syncing up in
  person. I think it is close to getting committed although development
  for KAFKA-1927 can probably begin without it.
 
  There is one more item we covered at the hangout. i.e., whether we
  want to add the coordinator to the topic metadata response or provide
  a clearer ClusterMetadataRequest.
 
  There are two reasons I think we should try and avoid adding the
  field:
  - It is irrelevant to topic metadata
  - If we finally do request rerouting in Kafka then the field would add
little to no value. (It still helps to have a separate
ClusterMetadataRequest to query for cluster-wide information such as
'which broker is the controller?' as Joe mentioned.)
 
  I think it would be cleaner to have an explicit ClusterMetadataRequest
  that you can send to any broker in order to obtain the controller (and
  in the future possibly other cluster-wide information). I think the
  main argument against doing this and instead adding it to the topic
  metadata response was convenience - i.e., you don't have to discover
  the controller in advance. However, I don't see much actual
  benefit/convenience in this and in fact think it is a non-issue. Let
  me know if I'm overlooking something here.
 
  As an example, say we need to initiate partition reassignment by
  issuing the new ReassignPartitionsRequest to the controller (assume we
  already have the desired manual partition assignment).  If we are to
  augment topic metadata response then the flow be something like this :
 
  - Issue topic metadata request to any broker (and discover the
controller
  - Connect to controller if required (i.e., if the broker above !=
controller)
  - Issue the partition reassignment request to the controller.
 
  With an explicit cluster metadata request it would be:
  - Issue cluster metadata request to any broker
  - Connect to controller if required (i.e., if the broker above !=
controller)
  - Issue the partition reassignment request
 
  So it seems to add little practical value and bloats topic metadata
  response with an irrelevant detail.
 
  The other angle to this is the following - is it a matter of naming?
  Should we just rename topic metadata request/response to just
  MetadataRequest/Response and add cluster metadata to it? By that same
  token should we also allow querying for the consumer coordinator (and
  in future transaction coordinator) as well? This leads to a bloated
  request which isn't very appealing and altogether confusing.
 
  Thanks,
 
  Joel
 
  On Wed, Mar 18, 2015 at 09:34:12AM -0700, Jun Rao wrote:
   Andri,
  
   Thanks for the summary.
  
   1. I just realized that in order to start working on KAFKA-1927, we
 will
   need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk.
   This is planned to be done as part of KAFKA-1634. So, we will need
  Guozhang
   and Joel's help to wrap this up.
  
   2. Thinking about this a bit more, if the semantic of those write
   requests is async (i.e., after the client gets a response, it just
 means
   that the operation is initiated, but not necessarily completed), we
 don't
   really need to forward the requests to the controller. Instead, the
   receiving broker can just write the operation to ZK as the admin
 command
   line tool previously does. This will simplify the implementation.
  
   8. There is another implementation detail for describe topic. Ideally,
 we
   want to read the topic config from the broker cache, instead of
  ZooKeeper.
   Currently, every broker reads the topic-level config for all topics.
   However, it ignores those for topics not hosted on itself. So, we may
  need
   to change TopicConfigManager a bit so that it caches the configs for
 all
   topics.
  
   Thanks,
  
   Jun
  

Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Jun Rao
3. The way that things are working right now is that there is a single
request object for all versions. The layout of the request object always
corresponds to the latest version. Under normal version evolution, the
request object should be able to be constructed the binary of any version.
For example, if an old version is missing a field, the request object will
just be using a default value. If an old version has an extra field, it's
just ignored.

Thanks,

Jun

On Wed, Mar 18, 2015 at 2:50 PM, Gwen Shapira gshap...@cloudera.com wrote:

 See inline responses:

 On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps jay.kr...@gmail.com wrote:
  Hey Gwen,
 
  This makes sense to me.
 
  A couple of thoughts, mostly confirming what you said I think:
 
 1. Ideally we would move completely over to the new style of request
 definition for server-side processing, even for the internal
 requests. This
 way all requests would have the same header/body struct stuff. As you
 say
 for the internal requests we can just delete the scala code. For the
 old
 clients they will continue to use their old request definitions until
 we
 eol them. I would propose that new changes will go only into the new
 request/response objects and the old scala ones will be permanently
 stuck
 on their current version until discontinued. So after this change
 that old
 scala code could be considered frozen.

 SimpleConsumer is obviously stuck with the old request/response.

 The Producers can be converted to the common request/response without
 breaking compatibility.
 I think we should do this (even though it requires fiddling with
 additional network serialization code), just so we can throw the old
 ProduceRequest away.

 Does that make sense?


 2. I think it would be reasonable to keep all the requests under
 common,
 even though as you point out there is currently no use for some of
 them
 beyond broker-to-broker communication at the moment.

 Yep.

 3. We should think a little about how versioning will work. Making
 this
 convenient on the server side is an important goal for the new style
 of
 request definition. At the serialization level we now handle
 versioning but
 the question we should discuss and work out is how this will map to
 the
 request objects (which I assume will remain unversioned).

 The way I see it working (I just started on this, so I may have gaps):

 * Request header contains the version
 * When we read the request, we use ProtoUtils.requestSchema which
 takes version as a parameter and is responsible to give us the right
 Schema, which we use to read the buffer and get the correct struct.
 * KafkaApis handlers have the header, so they can use it to access the
 correct fields, build the correct response, etc.

 Does that sound about right?


 4. Ideally after this refactoring the network package should not be
 dependent on the individual request objects. The intention is that
 stuff in
 kafka.network is meant to be generic network infrastructure that
 doesn't
 know about the particular fetch/produce apis we have implemented on
 top.

 I'll make a note to validate that this is the case.

 
  -Jay
 
  On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
  Hi Jun,
 
  I was taking a slightly different approach. Let me know if it makes
  sense to you:
 
  1. Get the bytes from network (kinda unavoidable...)
  2. Modify RequestChannel.Request to contain header and body (instead
  of a single object)
  3. Create the head and body from bytes as follow:
  val header: RequestHeader = RequestHeader.parse(buffer)
  val apiKey: Int = header.apiKey
  val body: Struct =
 
 ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
  4. KafkaAPIs will continue getting RequestChannel.Request, but will
  now have access to body and header separately.
 
  I agree that I need a Request/Response objects that contain only the
  body for all requests objects.
  I'm thinking of implementing them in o.a.k.Common.Requests in Java for
  consistency.
 
  When we are discussing the requests/responses used in SimpleConsumer,
  we mean everything used in javaapi, right?
 
  Gwen
 
 
 
  On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote:
   Hi, Gwen,
  
   I was thinking that we will be doing the following in KAFKA-1927.
  
   1. Get the bytes from network.
   2. Use a new generic approach to convert bytes into request objects.
   2.1 Read the fixed request header (using the util in client).
   2.2 Based on the request id in the header, deserialize the rest of the
   bytes into a request specific object (using the new java objects).
   3. We will then be passing a header and an AbstractRequestResponse to
   KafkaApis.
  
   In order to do that, we will need to create similar request/response
   objects for internal requests such as StopReplica, LeaderAndIsr,
   UpdateMetadata, ControlledShutdown. Not 

Build failed in Jenkins: Kafka-trunk #427

2015-03-18 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/427/changes

Changes:

[wangguoz] KAFKA-1997; Hopefully last follow-up fix to get messageHandlerArgs 
right

--
[...truncated 1921 lines...]
at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:40)

kafka.integration.PrimitiveApiTest  testPipelinedProduceRequests FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:40)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:44)
at 
kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:40)
at 
kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33)
at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:40)

kafka.integration.AutoOffsetResetTest  testResetToEarliestWhenOffsetTooHigh 
FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.AutoOffsetResetTest.kafka$integration$KafkaServerTestHarness$$super$setUp(AutoOffsetResetTest.scala:32)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:44)
at 
kafka.integration.AutoOffsetResetTest.setUp(AutoOffsetResetTest.scala:46)

kafka.integration.AutoOffsetResetTest  testResetToEarliestWhenOffsetTooLow 
FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.AutoOffsetResetTest.kafka$integration$KafkaServerTestHarness$$super$setUp(AutoOffsetResetTest.scala:32)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:44)
at 
kafka.integration.AutoOffsetResetTest.setUp(AutoOffsetResetTest.scala:46)

kafka.integration.AutoOffsetResetTest  testResetToLatestWhenOffsetTooHigh 
FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at 
kafka.integration.AutoOffsetResetTest.kafka$integration$KafkaServerTestHarness$$super$setUp(AutoOffsetResetTest.scala:32)
at 
kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:44)
at 
kafka.integration.AutoOffsetResetTest.setUp(AutoOffsetResetTest.scala:46)

kafka.integration.AutoOffsetResetTest  testResetToLatestWhenOffsetTooLow FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 

Re: Review Request 32193: Patch for KAFKA-1997

2015-03-18 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32193/
---

(Updated March 18, 2015, 7:47 p.m.)


Review request for kafka.


Bugs: KAFKA-1997
https://issues.apache.org/jira/browse/KAFKA-1997


Repository: kafka


Description (updated)
---

Replaced producer config properties string with macros


Diffs (updated)
-

  core/src/main/scala/kafka/tools/MirrorMaker.scala 
11acc3103e4e4a30d7380e26355ccba09b3304bb 

Diff: https://reviews.apache.org/r/32193/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-1997) Refactor Mirror Maker

2015-03-18 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1997:

Attachment: KAFKA-1997_2015-03-18_12:47:32.patch

 Refactor Mirror Maker
 -

 Key: KAFKA-1997
 URL: https://issues.apache.org/jira/browse/KAFKA-1997
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1997.patch, KAFKA-1997.patch, 
 KAFKA-1997_2015-03-03_16:28:46.patch, KAFKA-1997_2015-03-04_15:07:46.patch, 
 KAFKA-1997_2015-03-04_15:42:45.patch, KAFKA-1997_2015-03-05_20:14:58.patch, 
 KAFKA-1997_2015-03-09_18:55:54.patch, KAFKA-1997_2015-03-10_18:31:34.patch, 
 KAFKA-1997_2015-03-11_15:20:18.patch, KAFKA-1997_2015-03-11_19:10:53.patch, 
 KAFKA-1997_2015-03-13_14:43:34.patch, KAFKA-1997_2015-03-17_13:47:01.patch, 
 KAFKA-1997_2015-03-18_12:47:32.patch


 Refactor mirror maker based on KIP-3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1997) Refactor Mirror Maker

2015-03-18 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367761#comment-14367761
 ] 

Jiangjie Qin commented on KAFKA-1997:
-

Updated reviewboard https://reviews.apache.org/r/32193/diff/
 against branch origin/trunk

 Refactor Mirror Maker
 -

 Key: KAFKA-1997
 URL: https://issues.apache.org/jira/browse/KAFKA-1997
 Project: Kafka
  Issue Type: Improvement
Reporter: Jiangjie Qin
Assignee: Jiangjie Qin
 Attachments: KAFKA-1997.patch, KAFKA-1997.patch, 
 KAFKA-1997_2015-03-03_16:28:46.patch, KAFKA-1997_2015-03-04_15:07:46.patch, 
 KAFKA-1997_2015-03-04_15:42:45.patch, KAFKA-1997_2015-03-05_20:14:58.patch, 
 KAFKA-1997_2015-03-09_18:55:54.patch, KAFKA-1997_2015-03-10_18:31:34.patch, 
 KAFKA-1997_2015-03-11_15:20:18.patch, KAFKA-1997_2015-03-11_19:10:53.patch, 
 KAFKA-1997_2015-03-13_14:43:34.patch, KAFKA-1997_2015-03-17_13:47:01.patch, 
 KAFKA-1997_2015-03-18_12:47:32.patch


 Refactor mirror maker based on KIP-3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-18 Thread Jay Kreps
Hey Steven,

The current proposal is actually to enforce quotas at the
client/application level, NOT the topic level. So if you have a service
with a few dozen instances the quota is against all of those instances
added up across all their topics. So actually the effect would be the same
either way but throttling gives the producer the choice of either blocking
or dropping.

-Jay

On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote:

 Jay,

 let's say an app produces to 10 different topics. one of the topic is sent
 from a library. due to whatever condition/bug, this lib starts to send
 messages over the quota. if we go with the delayed response approach, it
 will cause the whole shared RecordAccumulator buffer to be filled up. that
 will penalize other 9 topics who are within the quota. that is the
 unfairness point that Ewen and I were trying to make.

 if broker just drop the msg and return an error/status code indicates the
 drop and why. then producer can just move on and accept the drop. shared
 buffer won't be saturated and other 9 topics won't be penalized.

 Thanks,
 Steven



 On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Steven,
 
  It is true that hitting the quota will cause back-pressure on the
 producer.
  But the solution is simple, a producer that wants to avoid this should
 stay
  under its quota. In other words this is a contract between the cluster
 and
  the client, with each side having something to uphold. Quite possibly the
  same thing will happen in the absence of a quota, a client that produces
 an
  unexpected amount of load will hit the limits of the server and
 experience
  backpressure. Quotas just allow you to set that same limit at something
  lower than 100% of all resources on the server, which is useful for a
  shared cluster.
 
  -Jay
 
  On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com
 wrote:
 
   wait. we create one kafka producer for each cluster. each cluster can
  have
   many topics. if producer buffer got filled up due to delayed response
 for
   one throttled topic, won't that penalize other topics unfairly? it
 seems
  to
   me that broker should just return error without delay.
  
   sorry that I am chatting to myself :)
  
   On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com
  wrote:
  
I think I can answer my own question. delayed response will cause the
producer buffer to be full, which then result in either thread
 blocking
   or
message drop.
   
On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com
   wrote:
   
please correct me if I am missing sth here. I am not understanding
 how
would throttle work without cooperation/back-off from producer. new
  Java
producer supports non-blocking API. why would delayed response be
 able
   to
slow down producer? producer will continue to fire async sends.
   
On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com
 
wrote:
   
I think we are really discussing two separate issues here:
   
1. Whether we should a) append-then-block-then-returnOKButThrottled
  or
   b)
block-then-returnFailDuetoThrottled for quota actions on produce
requests.
   
Both these approaches assume some kind of well-behaveness of the
   clients:
option a) assumes the client sets an proper timeout value while can
   just
ignore OKButThrottled response, while option b) assumes the
 client
handles the FailDuetoThrottled appropriately. For any malicious
   clients
that, for example, just keep retrying either intentionally or not,
neither
of these approaches are actually effective.
   
2. For OKButThrottled and FailDuetoThrottled responses, shall
 we
encode
them as error codes or augment the protocol to use a separate field
indicating status codes.
   
Today we have already incorporated some status code as error codes
 in
   the
responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros
 of
   this
is of course using a single field for response status like the HTTP
status
codes, while the cons is that it requires clients to handle the
 error
codes
carefully.
   
I think maybe we can actually extend the single-code approach to
   overcome
its drawbacks, that is, wrap the error codes semantics to the users
  so
that
users do not need to handle the codes one-by-one. More concretely,
following Jay's example the client could write sth. like this:
   
   
-
   
  if(error.isOK())
 // status code is good or the code can be simply ignored for
  this
request type, process the request
  else if(error.needsRetry())
 // throttled, transient error, etc: retry
  else if(error.isFatal())
 // non-retriable errors, etc: notify / terminate / other
  handling
   
-
   
Only when the clients really want to handle, for example
  

Re: [Discussion] Using Client Requests and Responses in Server

2015-03-18 Thread Jay Kreps
Hey Gwen,

This makes sense to me.

A couple of thoughts, mostly confirming what you said I think:

   1. Ideally we would move completely over to the new style of request
   definition for server-side processing, even for the internal requests. This
   way all requests would have the same header/body struct stuff. As you say
   for the internal requests we can just delete the scala code. For the old
   clients they will continue to use their old request definitions until we
   eol them. I would propose that new changes will go only into the new
   request/response objects and the old scala ones will be permanently stuck
   on their current version until discontinued. So after this change that old
   scala code could be considered frozen.
   2. I think it would be reasonable to keep all the requests under common,
   even though as you point out there is currently no use for some of them
   beyond broker-to-broker communication at the moment.
   3. We should think a little about how versioning will work. Making this
   convenient on the server side is an important goal for the new style of
   request definition. At the serialization level we now handle versioning but
   the question we should discuss and work out is how this will map to the
   request objects (which I assume will remain unversioned).
   4. Ideally after this refactoring the network package should not be
   dependent on the individual request objects. The intention is that stuff in
   kafka.network is meant to be generic network infrastructure that doesn't
   know about the particular fetch/produce apis we have implemented on top.


-Jay

On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 Hi Jun,

 I was taking a slightly different approach. Let me know if it makes
 sense to you:

 1. Get the bytes from network (kinda unavoidable...)
 2. Modify RequestChannel.Request to contain header and body (instead
 of a single object)
 3. Create the head and body from bytes as follow:
 val header: RequestHeader = RequestHeader.parse(buffer)
 val apiKey: Int = header.apiKey
 val body: Struct =
 ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct]
 4. KafkaAPIs will continue getting RequestChannel.Request, but will
 now have access to body and header separately.

 I agree that I need a Request/Response objects that contain only the
 body for all requests objects.
 I'm thinking of implementing them in o.a.k.Common.Requests in Java for
 consistency.

 When we are discussing the requests/responses used in SimpleConsumer,
 we mean everything used in javaapi, right?

 Gwen



 On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote:
  Hi, Gwen,
 
  I was thinking that we will be doing the following in KAFKA-1927.
 
  1. Get the bytes from network.
  2. Use a new generic approach to convert bytes into request objects.
  2.1 Read the fixed request header (using the util in client).
  2.2 Based on the request id in the header, deserialize the rest of the
  bytes into a request specific object (using the new java objects).
  3. We will then be passing a header and an AbstractRequestResponse to
  KafkaApis.
 
  In order to do that, we will need to create similar request/response
  objects for internal requests such as StopReplica, LeaderAndIsr,
  UpdateMetadata, ControlledShutdown. Not sure whether they should be
 written
  in java or scala, but perhaps they should be only in the core project.
 
  Also note, there are some scala requests/responses used directly in
  SimpleConsumer. Since that's our public api, we can't remove those scala
  objects until the old consumer is phased out. We can remove the rest of
 the
  scala request objects.
 
  Thanks,
 
  Jun
 
 
  On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  Hi,
 
  I'm starting this thread for the various questions I run into while
  refactoring the server to use client requests and responses.
 
  Help is appreciated :)
 
  First question: LEADER_AND_ISR request and STOP_REPLICA request are
  unimplemented in the client.
 
  Do we want to implement them as part of this refactoring?
  Or should we continue using the scala implementation for those?
 
  Gwen
 



[jira] [Commented] (KAFKA-1912) Create a simple request re-routing facility

2015-03-18 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367966#comment-14367966
 ] 

Jay Kreps commented on KAFKA-1912:
--

1. I think this can be a single thread. The number of such requests is bounded 
by the number of connections since we process only one request at a time. In 
this respect it is not different from the purgatories, which can maintain an 
entry per connection and has a background thread.
2. Probably from the client's point of view a timeout is a timeout, no?
3. I think it is okay for this queue to be unbounded as the request objects 
will be small and there is an implicit bound from the number of connections 
(you can never have more than one request in flight for any connection). I 
agree if you tried to bound it that would add all sorts of complexity.
4. Not sure if I understand that comment. [~guozhang] can you explain a bit 
more?

 Create a simple request re-routing facility
 ---

 Key: KAFKA-1912
 URL: https://issues.apache.org/jira/browse/KAFKA-1912
 Project: Kafka
  Issue Type: Improvement
Reporter: Jay Kreps
 Fix For: 0.8.3


 We are accumulating a lot of requests that have to be directed to the correct 
 server. This makes sense for high volume produce or fetch requests. But it is 
 silly to put the extra burden on the client for the many miscellaneous 
 requests such as fetching or committing offsets and so on.
 This adds a ton of practical complexity to the clients with little or no 
 payoff in performance.
 We should add a generic request-type agnostic re-routing facility on the 
 server. This would allow any server to accept a request and forward it to the 
 correct destination, proxying the response back to the user. Naturally it 
 needs to do this without blocking the thread.
 The result is that a client implementation can choose to be optimally 
 efficient and manage a local cache of cluster state and attempt to always 
 direct its requests to the proper server OR it can choose simplicity and just 
 send things all to a single host and let that host figure out where to 
 forward it.
 I actually think we should implement this more or less across the board, but 
 some requests such as produce and fetch require more logic to proxy since 
 they have to be scattered out to multiple servers and gathered back to create 
 the response. So these could be done in a second phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-18 Thread Jiangjie Qin
It looks we have another option and are now deciding between the following
two interfaces:

1. Close() + close(timeout)
  - timeout could be either positive or zero.
  - only close(0) can be called from sender thread

2. Close() + abort() + close(timeout)
  - timeout can either be positive or zero
  - only abort() can be called from sender thread

  - abort() is equivalent to close(0) in 1) but does not join sender
thread and does not close metrics.
  - Another thread has to call close() or close(timeout) in order to make
sure the resources in producer are gone.

The tow approach provides the same function we need, the difference is
approach 2) follows convention of close() and abort(). On the other hand,
approach 1) saves one interface compared with approach 2) but does not
follow the convention.

When the two approaches come to user code, it is probably something like
this:

Try {
  While(!finished)
Producer.send(record, callback)
} catch (Exception e) {
  Producer.close(5)
}

Class CallbackImpl implements Callback {
  onCompletion(RecordMetadata metadata Exception e) {
If (e != null)
  Abort() / close()
  }
}

Because the two approach leads to almost the same user code, assuming
users are always calling producer.close() as a clean up step, personally I
prefer approach 2) as it follows convention.

Any thoughts?

Jiangjie (Becket) Qin


On 3/17/15, 10:25 AM, Jiangjie Qin j...@linkedin.com wrote:

Hi Jun,

Yes, as Guozhang said, the main reason we set a flag is because close(0)
is expected to be called by sender thread itself.
If we want to maintain the semantic meaning of close(), one alternative is
to have an abort() method does the same thing as close(0) except cleanup.
And in close(timeout), after timeout we call abort() and join the sender
thread. This was one of the previous proposal. We merged abort to close(0)
because they are almost doing the same thing. But from what you mentioned,
it might make sense to have two separate methods.

Thanks.

Jiangjie (Becket) Qin

On 3/16/15, 10:31 PM, Guozhang Wang wangg...@gmail.com wrote:

Yeah in this sense the sender thread will not exist immediately in the
close(0) call, but will only terminate after the current response batch
has
been processed, as will the producer instance itself.

There is a reason for this though: for a clean shutdown the caller thread
has to wait for the sender thread to join before closing the producer
instance, but this cannot be achieve if close(0) is called by the sender
thread itself (for example in KAFKA-1659, there is a proposal from Andrew
Stein on using thread.interrupt and thread.stop, but if it is called by
the
ioThread itself the stop call will fail). Hence we came up with the flag
approach to let the sender thread to close as soon as it is at the
barrier
of the run loop.

Guozhang

On Mon, Mar 16, 2015 at 9:41 PM, Jun Rao j...@confluent.io wrote:

 Hmm, does that mean that after close(0), the sender thread is not
necessary
 gone? Normally, after closing an entity, we expect all internal threads
 associated with the entity are shut down completely.

 Thanks,

 Jun

 On Mon, Mar 16, 2015 at 3:18 PM, Jiangjie Qin
j...@linkedin.com.invalid
 wrote:

  Hi Jun,
 
  Close(0) will set two flags in sender. Running=false and a newly
added
  forceClose=true. It will also set accumulator.closed=true so no
further
  producer.send() will succeed.
  The sender thread will finish executing all the callbacks in current
 batch
  of responses, then it will see the forceClose flag. It will just fail
all
  the incomplete batches in the producer and exit.
  So close(0) is a non-blocking call and sender thread will not try to
join
  itself in close(0).
 
  Thanks.
 
  Jiangjie (Becket) Qin
 
  On 3/16/15, 2:50 PM, Jun Rao j...@confluent.io wrote:
 
  How does close(0) work if it's called from the sender thread? If
 close(0)
  needs to wait for the sender thread to join, wouldn't this cause a
  deadlock?
  
  Thanks,
  
  Jun
  
  On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin
j...@linkedin.com.invalid
 
  wrote:
  
   Thanks Guozhang. It wouldn’t be as thoroughly considered without
   discussing with you :)
  
   Jiangjie (Becket) Qin
  
   On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote:
  
   Thanks Jiangjie,
   
   After talking to you offline on this, I have been convinced and
  changed my
   preference to blocking. The immediate shutdown approach does have
 some
   unsafeness in some cases.
   
   Guozhang
   
   On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin
  j...@linkedin.com.invalid
   
   wrote:
   
It looks that the problem we want to solve and the purpose we
want
 to
achieve is:
If user uses close() in callback, we want to let user be aware
that
  they
should use close(0) instead of close() in the callback.
   
We have agreed that we will have an error log to inform user
about
  this
mis-usage. The options differ in the way how we can force user
to
  take a
look at that error log.
  

Re: [DISCUSS] KIP-11- Authorization design for kafka security

2015-03-18 Thread Michael Herstine
Hi Parth,

Thanks! A few questions:

1. Do you want to permit rules in your ACLs that DENY access as well as
ALLOW? This can be handy setting up rules that have exceptions. E.g.
“Allow principal P to READ resource R from all hosts” with “Deny principal
P READ access to resource R from host H1” in combination would allow P to
READ R from all hosts *except* H1.

2. When a topic is newly created, will there be an ACL created for it? If
not, would that not deny subsequent access to it?

(nit) Maybe use Principal instead of String to represent principals?


On 3/9/15, 11:48 AM, Don Bosco Durai bo...@apache.org wrote:

Parth

Overall it is looking good. Couple of questionsŠ

- Can you give an example how the policies will look like in the default
implementation?
- In the operations, can we support ³CONNECT² also? This can be used
during Session connection
- Regarding access control for ³Topic Creation², since we can¹t do it on
the server side, can we de-scope it for? And plan it as a future feature
request?

Thanks

Bosco

 

On 3/6/15, 8:10 AM, Harsha ka...@harsha.io wrote:

Hi Parth,
Thanks for putting this together. Overall it looks good to
me. Although AdminUtils is a concern KIP-4  can probably fix
that part.
Thanks,
Harsha

On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote:
 Forgot to add links to wiki and jira.
 
 Link to wiki:
 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization
+
Interface
 Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688
 
 Thanks
 Parth
 
 From: Parth Brahmbhatt
 pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com
 Date: Thursday, March 5, 2015 at 10:33 AM
 To: dev@kafka.apache.orgmailto:dev@kafka.apache.org
 dev@kafka.apache.orgmailto:dev@kafka.apache.org
 Subject: [DISCUSS] KIP-11- Authorization design for kafka security
 
 Hi,
 
 KIP-11 is open for discussion , I have updated the wiki with the design
 and open questions.
 
 Thanks
 Parth