Re: [Discussion] Using Client Requests and Responses in Server
Gwen, Thanks for bringing this up! Regarding UpdateMetadata in KIP-4 - no it shouldn't be used in Admin CLI, its internal server message. We will probably use TMR there (depends how generic re-routing facility goes) but TMR is already used in NetworkClient, so I believe there are no doubts about it, it should be ported to java. Thanks, Andrii Biletskyi On Wed, Mar 18, 2015 at 3:13 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I think those two requests are only used by controller to broker communication. Not sure if client side will need them in KIP-4, unlikely I guess. Jiangjie (Becket) Qin On 3/17/15, 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I'm starting this thread for the various questions I run into while refactoring the server to use client requests and responses. Help is appreciated :) First question: LEADER_AND_ISR request and STOP_REPLICA request are unimplemented in the client. Do we want to implement them as part of this refactoring? Or should we continue using the scala implementation for those? Gwen
[jira] [Commented] (KAFKA-1858) Make ServerShutdownTest a bit less flaky
[ https://issues.apache.org/jira/browse/KAFKA-1858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14366975#comment-14366975 ] Adrian Preston commented on KAFKA-1858: --- Created reviewboard https://reviews.apache.org/r/32199/diff/ against branch origin/trunk Make ServerShutdownTest a bit less flaky Key: KAFKA-1858 URL: https://issues.apache.org/jira/browse/KAFKA-1858 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Attachments: KAFKA-1858.patch ServerShutdownTest currently: * Starts a KafkaServer * Does stuff * Stops the server * Counts if there are any live kafka threads This is fine on its own. But when running in a test suite (i.e gradle test), the test is very very sensitive to any other test freeing all resources. If you start a server in a previous test and forgot to close it, the ServerShutdownTest will find threads from the previous test and fail. This makes for a flaky test that is pretty challenging to troubleshoot. I suggest counting the threads at the beginning and end of each test in the class, and only failing if the number at the end is greater than the number at the beginning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1858) Make ServerShutdownTest a bit less flaky
[ https://issues.apache.org/jira/browse/KAFKA-1858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Adrian Preston updated KAFKA-1858: -- Attachment: KAFKA-1858.patch Make ServerShutdownTest a bit less flaky Key: KAFKA-1858 URL: https://issues.apache.org/jira/browse/KAFKA-1858 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Attachments: KAFKA-1858.patch ServerShutdownTest currently: * Starts a KafkaServer * Does stuff * Stops the server * Counts if there are any live kafka threads This is fine on its own. But when running in a test suite (i.e gradle test), the test is very very sensitive to any other test freeing all resources. If you start a server in a previous test and forgot to close it, the ServerShutdownTest will find threads from the previous test and fail. This makes for a flaky test that is pretty challenging to troubleshoot. I suggest counting the threads at the beginning and end of each test in the class, and only failing if the number at the end is greater than the number at the beginning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 32199: Patch for KAFKA-1858
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32199/ --- Review request for kafka. Bugs: KAFKA-1858 https://issues.apache.org/jira/browse/KAFKA-1858 Repository: kafka Description --- KAFKA-1858. Better checking for threads being cleaned up Diffs - core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b46daa436231d5aa5c1e2992fd5c2d9a73a30c80 Diff: https://reviews.apache.org/r/32199/diff/ Testing --- Thanks, Adrian Preston
[jira] [Commented] (KAFKA-1858) Make ServerShutdownTest a bit less flaky
[ https://issues.apache.org/jira/browse/KAFKA-1858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14366985#comment-14366985 ] Adrian Preston commented on KAFKA-1858: --- As per the description in the bug report - the patch tightens up checking by comparing the threads running when a test starts with the tests running when the test completes. If any new Kafka threads are found, the test is failed with an AssertionFailedError containing a list of the new threads. Make ServerShutdownTest a bit less flaky Key: KAFKA-1858 URL: https://issues.apache.org/jira/browse/KAFKA-1858 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Attachments: KAFKA-1858.patch ServerShutdownTest currently: * Starts a KafkaServer * Does stuff * Stops the server * Counts if there are any live kafka threads This is fine on its own. But when running in a test suite (i.e gradle test), the test is very very sensitive to any other test freeing all resources. If you start a server in a previous test and forgot to close it, the ServerShutdownTest will find threads from the previous test and fail. This makes for a flaky test that is pretty challenging to troubleshoot. I suggest counting the threads at the beginning and end of each test in the class, and only failing if the number at the end is greater than the number at the beginning. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1759) transient unit test failure in PartitionAssignorTest
[ https://issues.apache.org/jira/browse/KAFKA-1759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367063#comment-14367063 ] Rajini Sivaram commented on KAFKA-1759: --- This has been fixed under [KAFKA-1823] transient unit test failure in PartitionAssignorTest Key: KAFKA-1759 URL: https://issues.apache.org/jira/browse/KAFKA-1759 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.3 Reporter: Jun Rao Fix For: 0.8.3 Saw the following transient failure. unit.kafka.consumer.PartitionAssignorTest testRoundRobinPartitionAssignor FAILED java.lang.UnsupportedOperationException: empty.max at scala.collection.TraversableOnce$class.max(TraversableOnce.scala:216) at scala.collection.AbstractIterator.max(Iterator.scala:1157) at unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:190) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2019) RoundRobinAssignor clusters by consumer
[ https://issues.apache.org/jira/browse/KAFKA-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367189#comment-14367189 ] Jiangjie Qin commented on KAFKA-2019: - Yeah, you are right. I somehow thought the partition will be hashed to a particular thread. Yes, hash the thread id will have no worse balance than current round robin. Currently we are hashing over topic partition, maybe changing it to only hash thread id is enough, so we don't need to hash both. RoundRobinAssignor clusters by consumer --- Key: KAFKA-2019 URL: https://issues.apache.org/jira/browse/KAFKA-2019 Project: Kafka Issue Type: Bug Components: consumer Reporter: Joseph Holsten Assignee: Neha Narkhede Priority: Minor Attachments: 0001-sort-consumer-thread-ids-by-hashcode.patch, KAFKA-2019.patch When rolling out a change today, I noticed that some of my consumers are greedy, taking far more partitions than others. The cause is that the RoundRobinAssignor is using a list of ConsumerThreadIds sorted by toString, which is {{ %s-%d.format(consumer, threadId)}}. This causes each consumer's threads to be adjacent to each other. One possible fix would be to define ConsumerThreadId.hashCode, and sort by that. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367275#comment-14367275 ] Jiangjie Qin commented on KAFKA-2029: - +1 on this in general. Current design has several issues including data loss during leader migration if acks is not set to -1. In long term it is definitely worth refactoring the controller. Though I still think adding a timeout for putting message into controller-to-broker queue is dangerous and could lead to inconsistent states among brokers. We can discuss the solution to the problem you mentioned: For 1) and 2), they seem to be introduced by setting the controller-to-broker message queue size back to bounded. For 3), I checked the latest trunk, shutdown thread waits for auto leader rebalance thread outside of controller lock. Does this problem still stand? For 4) and 5), good point and +1. Personally, I think prioritizing the controller to broker message is a simpler approach to achieve the goal of fast leader migration and mitigate data loss. 4) and 5) are also worth doing. Improving controlled shutdown for rolling updates - Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) = // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error(Timed out trying to send message to broker + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // Controller to broker state change requests batch is not empty while creating a new one //throw new IllegalStateException(Timed out trying to send message to broker + brokerId.toString) } case None =
Re: Review Request 32199: Patch for KAFKA-1858
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32199/#review76896 --- This will definitely make the test more stable. I am just thinking does it make sense to add a non deamon thread check in each test? So we can make sure there is no deamon thread left from some tests. - Jiangjie Qin On March 18, 2015, 11:12 a.m., Adrian Preston wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32199/ --- (Updated March 18, 2015, 11:12 a.m.) Review request for kafka. Bugs: KAFKA-1858 https://issues.apache.org/jira/browse/KAFKA-1858 Repository: kafka Description --- KAFKA-1858. Better checking for threads being cleaned up Diffs - core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b46daa436231d5aa5c1e2992fd5c2d9a73a30c80 Diff: https://reviews.apache.org/r/32199/diff/ Testing --- Thanks, Adrian Preston
Re: [Discussion] Using Client Requests and Responses in Server
Yes, I was talking about TopicMetadataRequest.scala (TMR), which is MetadataRequest in java definitions. None of the mentioned above messages (StopReplica etc) are required in KIP-4. Sorry, if I misled someone. Thanks, Andrii Biletskyi On Wed, Mar 18, 2015 at 5:42 PM, Gwen Shapira gshap...@cloudera.com wrote: Sorry, Andrii, I'm a bit confused. The following request/response pairs are currently not implemented in the client: StopReplica, LeaderAndIsr, UpdateMetadata, ControlledShutdown. Does KIP-4 require any of these? Also, what's TMR? Gwen On Wed, Mar 18, 2015 at 4:07 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Gwen, Thanks for bringing this up! Regarding UpdateMetadata in KIP-4 - no it shouldn't be used in Admin CLI, its internal server message. We will probably use TMR there (depends how generic re-routing facility goes) but TMR is already used in NetworkClient, so I believe there are no doubts about it, it should be ported to java. Thanks, Andrii Biletskyi On Wed, Mar 18, 2015 at 3:13 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I think those two requests are only used by controller to broker communication. Not sure if client side will need them in KIP-4, unlikely I guess. Jiangjie (Becket) Qin On 3/17/15, 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I'm starting this thread for the various questions I run into while refactoring the server to use client requests and responses. Help is appreciated :) First question: LEADER_AND_ISR request and STOP_REPLICA request are unimplemented in the client. Do we want to implement them as part of this refactoring? Or should we continue using the scala implementation for those? Gwen
[jira] [Commented] (KAFKA-1912) Create a simple request re-routing facility
[ https://issues.apache.org/jira/browse/KAFKA-1912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367331#comment-14367331 ] Andrii Biletskyi commented on KAFKA-1912: - [~jkreps] thanks for adding the code. A few questions / comments: 1. Regarding RequestRerouter shim: If we store and re-route unsent messages in the separate thread, outside the main io thread pool, wouldn't it blur num.io.threads setting? With this facility in place there will be a RequestHandlerPool bounded by num.io.threads and a message queue with a worker managing not-routed messages and unsent responses to clients. 2. On a message queue: if we bombard some broker with requests which require re-routing to controller wouldn't it be strange when client receives TimeoutException because _broker_ wasn't able to process that but not the controller who is the real request executor. 3. Is it possible for NetworkClient (on the client side) to know whether target broker's message queue is full and thus broker may delay re-routing? Consider there is a high priority admin task (like alter topic's config) but the target broker is busy with produce / fetch requests. 4. This was mentioned by [~guozhang] in the mailing list. Let me add this question too: the broker instance which carries the current controller is agnostic of its existence, and use KafkaApis to handle general Kafka requests. Having all admin requests redirected to the controller instance will force the broker to be aware of its carried controller, and access its internal data for handling these requests. Plus with the number of clients out of Kafka's control, this may easily cause the controller to be a hot spot in terms of request load. Create a simple request re-routing facility --- Key: KAFKA-1912 URL: https://issues.apache.org/jira/browse/KAFKA-1912 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Fix For: 0.8.3 We are accumulating a lot of requests that have to be directed to the correct server. This makes sense for high volume produce or fetch requests. But it is silly to put the extra burden on the client for the many miscellaneous requests such as fetching or committing offsets and so on. This adds a ton of practical complexity to the clients with little or no payoff in performance. We should add a generic request-type agnostic re-routing facility on the server. This would allow any server to accept a request and forward it to the correct destination, proxying the response back to the user. Naturally it needs to do this without blocking the thread. The result is that a client implementation can choose to be optimally efficient and manage a local cache of cluster state and attempt to always direct its requests to the proper server OR it can choose simplicity and just send things all to a single host and let that host figure out where to forward it. I actually think we should implement this more or less across the board, but some requests such as produce and fetch require more logic to proxy since they have to be scattered out to multiple servers and gathered back to create the response. So these could be done in a second phase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests
[ https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367396#comment-14367396 ] Jun Rao edited comment on KAFKA-1927 at 3/18/15 4:18 PM: - In order to work on this jira, we will need to merge KAFKA-1841 into trunk, which is waiting on KAFKA-1634. was (Author: junrao): In order to work on this jira, we will need to merge KAFKA-1841 into trunk, which is waiting on KAFA-1634. Replace requests in kafka.api with requests in org.apache.kafka.common.requests --- Key: KAFKA-1927 URL: https://issues.apache.org/jira/browse/KAFKA-1927 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Assignee: Gwen Shapira Fix For: 0.8.3 The common package introduced a better way of defining requests using a new protocol definition DSL and also includes wrapper objects for these. We should switch KafkaApis over to use these request definitions and consider the scala classes deprecated (we probably need to retain some of them for a while for the scala clients). This will be a big improvement because 1. We will have each request now defined in only one place (Protocol.java) 2. We will have built-in support for multi-version requests 3. We will have much better error messages (no more cryptic underflow errors) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1634) Improve semantics of timestamp in OffsetCommitRequests and update documentation
[ https://issues.apache.org/jira/browse/KAFKA-1634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367394#comment-14367394 ] Jun Rao commented on KAFKA-1634: [~guozhang], [~jjkoshy], in order to work on KAFKA-1927, we will need to have KAFKA-1841 merged into trunk, which depends on this jira. Improve semantics of timestamp in OffsetCommitRequests and update documentation --- Key: KAFKA-1634 URL: https://issues.apache.org/jira/browse/KAFKA-1634 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Guozhang Wang Priority: Blocker Fix For: 0.8.3 Attachments: KAFKA-1634.patch, KAFKA-1634_2014-11-06_15:35:46.patch, KAFKA-1634_2014-11-07_16:54:33.patch, KAFKA-1634_2014-11-17_17:42:44.patch, KAFKA-1634_2014-11-21_14:00:34.patch, KAFKA-1634_2014-12-01_11:44:35.patch, KAFKA-1634_2014-12-01_18:03:12.patch, KAFKA-1634_2015-01-14_15:50:15.patch, KAFKA-1634_2015-01-21_16:43:01.patch, KAFKA-1634_2015-01-22_18:47:37.patch, KAFKA-1634_2015-01-23_16:06:07.patch, KAFKA-1634_2015-02-06_11:01:08.patch From the mailing list - following up on this -- I think the online API docs for OffsetCommitRequest still incorrectly refer to client-side timestamps: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Wasn't that removed and now always handled server-side now? Would one of the devs mind updating the API spec wiki? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31568: Patch for KAFKA-1989
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review76899 --- core/src/main/scala/kafka/server/DelayedOperation.scala https://reviews.apache.org/r/31568/#comment124592 This function is no longer triggered in the new purgatory; we need to add it back since it is instantiated in produce / fetch for recording metrics, etc. core/src/main/scala/kafka/server/DelayedOperation.scala https://reviews.apache.org/r/31568/#comment124591 Try purging on each watcher list in each doWork will likely increase the CPU a lot, we can reduce the CPU usage by only purging when watched() is too large. - Guozhang Wang On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/ --- (Updated Feb. 28, 2015, 12:14 a.m.) Review request for kafka. Bugs: KAFKA-1989 https://issues.apache.org/jira/browse/KAFKA-1989 Repository: kafka Description --- new purgatory implementation Diffs - core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/31568/diff/ Testing --- Thanks, Yasuhiro Matsuda
RE: [KIP-DISCUSSION] KIP-13 Quotas
Is it possible these 3 options during the next KIP hangout? Aditya From: Steven Wu [stevenz...@gmail.com] Sent: Tuesday, March 17, 2015 10:08 AM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example FailDuetoThrottled status code specifically, it needs to: if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error == FailDuetoThrottled ) // throttled: log it else if(error.needsRetry()) // transient error,
[jira] [Commented] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests
[ https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367396#comment-14367396 ] Jun Rao commented on KAFKA-1927: In order to work on this jira, we will need to merge KAFKA-1841 into trunk, which is waiting on KAFA-1634. Replace requests in kafka.api with requests in org.apache.kafka.common.requests --- Key: KAFKA-1927 URL: https://issues.apache.org/jira/browse/KAFKA-1927 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Assignee: Gwen Shapira Fix For: 0.8.3 The common package introduced a better way of defining requests using a new protocol definition DSL and also includes wrapper objects for these. We should switch KafkaApis over to use these request definitions and consider the scala classes deprecated (we probably need to retain some of them for a while for the scala clients). This will be a big improvement because 1. We will have each request now defined in only one place (Protocol.java) 2. We will have built-in support for multi-version requests 3. We will have much better error messages (no more cryptic underflow errors) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Andri, Thanks for the summary. 1. I just realized that in order to start working on KAFKA-1927, we will need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk. This is planned to be done as part of KAFKA-1634. So, we will need Guozhang and Joel's help to wrap this up. 2. Thinking about this a bit more, if the semantic of those write requests is async (i.e., after the client gets a response, it just means that the operation is initiated, but not necessarily completed), we don't really need to forward the requests to the controller. Instead, the receiving broker can just write the operation to ZK as the admin command line tool previously does. This will simplify the implementation. 8. There is another implementation detail for describe topic. Ideally, we want to read the topic config from the broker cache, instead of ZooKeeper. Currently, every broker reads the topic-level config for all topics. However, it ignores those for topics not hosted on itself. So, we may need to change TopicConfigManager a bit so that it caches the configs for all topics. Thanks, Jun On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, Thanks for a great discussion! Here are the actions points: 1. Q: Get rid of all scala requests objects, use java protocol definitions. A: Gwen kindly took that (KAFKA-1927). It's important to speed up review procedure there since this ticket blocks other important changes. 2. Q: Generic re-reroute facility vs client maintaining cluster state. A: Jay has added pseudo code to KAFKA-1912 - need to consider whether this will be easy to implement as a server-side feature (comments are welcomed!). 3. Q: Controller field in wire protocol. A: This might be useful for clients, add this to TopicMetadataResponse (already in KIP). 4. Q: Decoupling topic creation from TMR. A: I will add proposed by Jun solution (using clientId for that) to the KIP. 5. Q: Bumping new versions of TMR vs grabbing all protocol changes in one version. A: It was decided to try to gather all changes to protocol (before release). In case of TMR it worth checking: KAFKA-2020 and KIP-13 (quotas) 6. Q: JSON lib is needed to deserialize user's input in CLI tool. A: Use jackson for that, /tools project is a separate jar so shouldn't be a big deal. 7. Q: VerifyReassingPartitions vs generic status check command. A: For long-running requests like reassign partitions *progress* check request is useful, it makes sense to introduce it. Please add, correct me if I missed something. Thanks, Andrii Biletskyi On Tue, Mar 17, 2015 at 6:20 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Joel, You are right, I removed ClusterMetadata because we have partially what we need in TopicMetadata. Also, as Jay pointed out earlier, we would like to have orthogonal API, but at the same time we need to be backward compatible. But I like your idea and even have some other arguments for this option: There is also DescribeTopicRequest which was proposed in this KIP, it returns topic configs, partitions, replication factor plus partition ISR, ASR, leader replica. The later part is really already there in TopicMetadataRequest. So again we'll have to add stuff to TMR, not to duplicate some info in newly added requests. However, this way we'll end up with monster request which returns cluster metadata, topic replication and config info plus partition replication data. Seems logical to split TMR to - ClusterMetadata (brokers + controller, maybe smth else) - TopicMetadata (topic info + partition details) But since current TMR is involved in lots of places (including network client, as I understand) this might be very serious change and it probably makes sense to stick with current approach. Thanks, Andrii Biletskyi On Tue, Mar 17, 2015 at 5:29 PM, Joel Koshy jjkosh...@gmail.com wrote: I may be missing some context but hopefully this will also be covered today: I thought the earlier proposal where there was an explicit ClusterMetadata request was clearer and explicit. During the course of this thread I think the conclusion was that the main need was for controller information and that can be rolled into the topic metadata response but that seems a bit irrelevant to topic metadata. FWIW I think the full broker-list is also irrelevant to topic metadata, but it is already there and in use. I think there is still room for an explicit ClusterMetadata request since there may be other cluster-level information that we may want to add over time (and that have nothing to do with topic metadata). On Tue, Mar 17, 2015 at 02:45:30PM +0200, Andrii Biletskyi wrote: Jun, 101. Okay, if you say that such use case is important. I also think using clientId for these purposes is fine - if we already have this
RE: [DISCUSS] KIP-5 - Broker Configuration Management
Hi Andrii, Thanks for the writeup. IMO, we should be able to support dynamically changing certain configs. For example: the Quota proposal relies on such a mechanism for changing quotas on the fly. Aditya From: Andrii Biletskyi [andrii.bilets...@stealth.ly] Sent: Tuesday, March 03, 2015 10:30 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-5 - Broker Configuration Management Hey Jun, Initially I was thinking about instead of 3-4 state that global config changes take effect only after broker restart. So it's just: 3-4. On each broker startup apply global config from ZK In other words, the comprehensive workflow is the following: 1. Issue ChangeGlobalConfigRequest 2. Controller validates / stores config in ZK (out of scope of this KIP) 3. Do rolling restart for brokers one by one to pick up config changes My understanding is that we won't be able to handle config change dynamically as we do for Log config. The main reason, from my point of view, is that broker config operates such settings as num.io.threads updating which would mean gracefully restart some of the broker's components (in this case SocketServer) which is, in turn, might be very tricky. Thanks, Andrii Biletskyi On Tue, Mar 3, 2015 at 7:43 PM, Jun Rao j...@confluent.io wrote: It seems the proposed workflow is the following. 1. Client issues a global config update request to the broker. 2. Broker writes the new config to ZK. 3. Controller picks up the changes from ZK. 4. Controller propagates the config changes to all brokers. Do we need to add a new request/response to propagate the config changes? Also, this logic is a bit different from how per topic config changes works: each broker reads from ZK directly. It would be good to make them consistent. Thanks, Jun On Wed, Jan 21, 2015 at 10:34 PM, Joe Stein joe.st...@stealth.ly wrote: Created a KIP https://cwiki.apache.org/confluence/display/KAFKA/KIP-5+-+Broker+Configuration+Management JIRA https://issues.apache.org/jira/browse/KAFKA-1786 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
[jira] [Commented] (KAFKA-1716) hang during shutdown of ZookeeperConsumerConnector
[ https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367361#comment-14367361 ] Ashwin Jayaprakash commented on KAFKA-1716: --- Sorry for the delay, I will get back to you on this ASAP. hang during shutdown of ZookeeperConsumerConnector -- Key: KAFKA-1716 URL: https://issues.apache.org/jira/browse/KAFKA-1716 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: Sean Fay Assignee: Neha Narkhede Attachments: after-shutdown.log, before-shutdown.log, kafka-shutdown-stuck.log It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to wedge in the case that some consumer fetcher threads receive messages during the shutdown process. Shutdown thread: {code}-- Parking to wait for: java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) at java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) at java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207) at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36) at kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121) at kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120) at scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226) at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39) at scala/collection/mutable/HashMap.foreach(HashMap.scala:98) at scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120) ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock] at kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148) at kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171) at kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code} ConsumerFetcherThread: {code}-- Parking to wait for: java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568 at jrockit/vm/Locks.park0(J)V(Native Method) at jrockit/vm/Locks.park(Locks.java:2230) at sun/misc/Unsafe.park(ZJ)V(Native Method) at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156) at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987) at java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306) at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224) at scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka/utils/Utils$.inLock(Utils.scala:538) at kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at
Re: [Discussion] Using Client Requests and Responses in Server
Sorry, Andrii, I'm a bit confused. The following request/response pairs are currently not implemented in the client: StopReplica, LeaderAndIsr, UpdateMetadata, ControlledShutdown. Does KIP-4 require any of these? Also, what's TMR? Gwen On Wed, Mar 18, 2015 at 4:07 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Gwen, Thanks for bringing this up! Regarding UpdateMetadata in KIP-4 - no it shouldn't be used in Admin CLI, its internal server message. We will probably use TMR there (depends how generic re-routing facility goes) but TMR is already used in NetworkClient, so I believe there are no doubts about it, it should be ported to java. Thanks, Andrii Biletskyi On Wed, Mar 18, 2015 at 3:13 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: I think those two requests are only used by controller to broker communication. Not sure if client side will need them in KIP-4, unlikely I guess. Jiangjie (Becket) Qin On 3/17/15, 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I'm starting this thread for the various questions I run into while refactoring the server to use client requests and responses. Help is appreciated :) First question: LEADER_AND_ISR request and STOP_REPLICA request are unimplemented in the client. Do we want to implement them as part of this refactoring? Or should we continue using the scala implementation for those? Gwen
Re: [Discussion] Using Client Requests and Responses in Server
Hi, Gwen, I was thinking that we will be doing the following in KAFKA-1927. 1. Get the bytes from network. 2. Use a new generic approach to convert bytes into request objects. 2.1 Read the fixed request header (using the util in client). 2.2 Based on the request id in the header, deserialize the rest of the bytes into a request specific object (using the new java objects). 3. We will then be passing a header and an AbstractRequestResponse to KafkaApis. In order to do that, we will need to create similar request/response objects for internal requests such as StopReplica, LeaderAndIsr, UpdateMetadata, ControlledShutdown. Not sure whether they should be written in java or scala, but perhaps they should be only in the core project. Also note, there are some scala requests/responses used directly in SimpleConsumer. Since that's our public api, we can't remove those scala objects until the old consumer is phased out. We can remove the rest of the scala request objects. Thanks, Jun On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I'm starting this thread for the various questions I run into while refactoring the server to use client requests and responses. Help is appreciated :) First question: LEADER_AND_ISR request and STOP_REPLICA request are unimplemented in the client. Do we want to implement them as part of this refactoring? Or should we continue using the scala implementation for those? Gwen
Re: Review Request 31568: Patch for KAFKA-1989
On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/server/DelayedOperation.scala, line 116 https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line116 We need to make tickMs and wheelSize configurable. Yasuhiro Matsuda wrote: What is the motivation? I don't think it is a good idea to allow users to configure them. Guozhang Wang wrote: I am not concerning about user-configurability. The purgatory is used by multiple request types: produce, fetch and in the future rebalance, heartbeat and join group, different request type may need to set the tickMs and wheelSize differently. It is easy to add parameters to DelayedOperationPurgatory since Timer already has them. But I don't see any compelling reason to do it now. Hierarchical Timing wheel is very robust to varying timeout requests by design. On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/server/DelayedOperation.scala, lines 187-192 https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line187 TBD Guozhang Wang wrote: Replace the TBD here: we can let Timer.addTimerTaskEntry and Timer.add return Boolean instead of Unit indicating if the task has not expired and successfully added to the timer. And then we can change above as if (!operation.isComplete()) { if (!timeoutTimer.add(operation) { operation.cancel() } } An expired task is always successfuly added to the timer and executed immediately. Do you mean completed instead of expired? TimeTask do not have notion of completion. I kept the completion concept out of the Timer implementation since it is not essential to Timer functionality. On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/utils/timer/Timer.scala, line 68 https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68 I think bucket.flush(reinsurt) will always fail on all the items since their expiration time will always bucket expiration + ticketMs, i.e. the returned bucket from the delayed queue has already expired all its items. In this case, could we just call foreach(submit) on all of them instead of trying to reinsurt them? Yasuhiro Matsuda wrote: It is true only for the lowest wheel. Reinsert is necessary to make timing wheels work. A bucket from a higher wheel may contain tasks not expired (a tick time is longer in a higher wheel). Guozhang Wang wrote: OK, I may miss sth. here, but this is my reasoning: The bucket is only returned from delayed queue in line 62 if its expiration time has passed currentTime, after that at least the lowest wheel will advance to its expiration time, and hence add call within the reinsert is doomed to fail as task.expirationTime wheel's time + tickMs. If the expired bucket is from the lowest wheel, all tasks in the bucket is expired. reinsert submits the task to a thread pool for execution. If the expired bucket is from a higher wheel, tasks are either expired or not expired. reinsert submits the expired tasks to a thread pool and move unexpired tasks to lower wheels. On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72 https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72 It seems the task entry of the task will only be set once throughout its life time; even when the task entry gets reinsurted its correspondence to the task will not change, right? If that is true we can just set the entry for the task in the constructor of the task entry. Yasuhiro Matsuda wrote: This sets TimerTaskEntry to TimerTask. TimeTask is created independently from a Timer, then enqueued to a Timer. Guozhang Wang wrote: Yes, but can we move this line to line 119 of TimerTaskList.scala? Then in line 46 of Timer when we create the TimerTaskEntry with the passed in TimerTask its entry field will be set automatically. If a task already enqueued to a timer is enqueued again intentionally or unintentionally (=bug), what happens? My intention here is to keep data structure consistent in such a case. setTimerTaskEntry removes the old entry if exists. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review76459 --- On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/ --- (Updated Feb. 28, 2015, 12:14 a.m.) Review request for kafka. Bugs: KAFKA-1989 https://issues.apache.org/jira/browse/KAFKA-1989
Re: [Discussion] Using Client Requests and Responses in Server
Hi Jun, I was taking a slightly different approach. Let me know if it makes sense to you: 1. Get the bytes from network (kinda unavoidable...) 2. Modify RequestChannel.Request to contain header and body (instead of a single object) 3. Create the head and body from bytes as follow: val header: RequestHeader = RequestHeader.parse(buffer) val apiKey: Int = header.apiKey val body: Struct = ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] 4. KafkaAPIs will continue getting RequestChannel.Request, but will now have access to body and header separately. I agree that I need a Request/Response objects that contain only the body for all requests objects. I'm thinking of implementing them in o.a.k.Common.Requests in Java for consistency. When we are discussing the requests/responses used in SimpleConsumer, we mean everything used in javaapi, right? Gwen On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote: Hi, Gwen, I was thinking that we will be doing the following in KAFKA-1927. 1. Get the bytes from network. 2. Use a new generic approach to convert bytes into request objects. 2.1 Read the fixed request header (using the util in client). 2.2 Based on the request id in the header, deserialize the rest of the bytes into a request specific object (using the new java objects). 3. We will then be passing a header and an AbstractRequestResponse to KafkaApis. In order to do that, we will need to create similar request/response objects for internal requests such as StopReplica, LeaderAndIsr, UpdateMetadata, ControlledShutdown. Not sure whether they should be written in java or scala, but perhaps they should be only in the core project. Also note, there are some scala requests/responses used directly in SimpleConsumer. Since that's our public api, we can't remove those scala objects until the old consumer is phased out. We can remove the rest of the scala request objects. Thanks, Jun On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I'm starting this thread for the various questions I run into while refactoring the server to use client requests and responses. Help is appreciated :) First question: LEADER_AND_ISR request and STOP_REPLICA request are unimplemented in the client. Do we want to implement them as part of this refactoring? Or should we continue using the scala implementation for those? Gwen
Re: Review Request 32193: Patch for KAFKA-1997
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32193/#review76910 --- core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/32193/#comment124623 Could we use ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION ... etc to replace the hard written string? - Guozhang Wang On March 18, 2015, 3:40 a.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32193/ --- (Updated March 18, 2015, 3:40 a.m.) Review request for kafka. Bugs: KAFKA-1997 https://issues.apache.org/jira/browse/KAFKA-1997 Repository: kafka Description --- Follow-up patch for KAFKA-1997, fix a few bugs. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala 11acc3103e4e4a30d7380e26355ccba09b3304bb Diff: https://reviews.apache.org/r/32193/diff/ Testing --- Thanks, Jiangjie Qin
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
On Wed, Mar 18, 2015 at 9:34 AM, Jun Rao j...@confluent.io wrote: Andri, Thanks for the summary. 1. I just realized that in order to start working on KAFKA-1927, we will need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk. This is planned to be done as part of KAFKA-1634. So, we will need Guozhang and Joel's help to wrap this up. I mentioned this in a separate thread, but it may be more relevant here: It looks like the SimpleConsumer API exposes TopicMetadataRequest and TopicMetadataResponse. This means that KAFKA-1927 doesn't remove this duplication. So I'm not sure we actually need KAFKA-1927 before implementing this KIP. This doesn't mean I'm stopping work on KAFKA-1927, but perhaps it means we can proceed in parallel? 2. Thinking about this a bit more, if the semantic of those write requests is async (i.e., after the client gets a response, it just means that the operation is initiated, but not necessarily completed), we don't really need to forward the requests to the controller. Instead, the receiving broker can just write the operation to ZK as the admin command line tool previously does. This will simplify the implementation. 8. There is another implementation detail for describe topic. Ideally, we want to read the topic config from the broker cache, instead of ZooKeeper. Currently, every broker reads the topic-level config for all topics. However, it ignores those for topics not hosted on itself. So, we may need to change TopicConfigManager a bit so that it caches the configs for all topics. Thanks, Jun On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, Thanks for a great discussion! Here are the actions points: 1. Q: Get rid of all scala requests objects, use java protocol definitions. A: Gwen kindly took that (KAFKA-1927). It's important to speed up review procedure there since this ticket blocks other important changes. 2. Q: Generic re-reroute facility vs client maintaining cluster state. A: Jay has added pseudo code to KAFKA-1912 - need to consider whether this will be easy to implement as a server-side feature (comments are welcomed!). 3. Q: Controller field in wire protocol. A: This might be useful for clients, add this to TopicMetadataResponse (already in KIP). 4. Q: Decoupling topic creation from TMR. A: I will add proposed by Jun solution (using clientId for that) to the KIP. 5. Q: Bumping new versions of TMR vs grabbing all protocol changes in one version. A: It was decided to try to gather all changes to protocol (before release). In case of TMR it worth checking: KAFKA-2020 and KIP-13 (quotas) 6. Q: JSON lib is needed to deserialize user's input in CLI tool. A: Use jackson for that, /tools project is a separate jar so shouldn't be a big deal. 7. Q: VerifyReassingPartitions vs generic status check command. A: For long-running requests like reassign partitions *progress* check request is useful, it makes sense to introduce it. Please add, correct me if I missed something. Thanks, Andrii Biletskyi On Tue, Mar 17, 2015 at 6:20 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Joel, You are right, I removed ClusterMetadata because we have partially what we need in TopicMetadata. Also, as Jay pointed out earlier, we would like to have orthogonal API, but at the same time we need to be backward compatible. But I like your idea and even have some other arguments for this option: There is also DescribeTopicRequest which was proposed in this KIP, it returns topic configs, partitions, replication factor plus partition ISR, ASR, leader replica. The later part is really already there in TopicMetadataRequest. So again we'll have to add stuff to TMR, not to duplicate some info in newly added requests. However, this way we'll end up with monster request which returns cluster metadata, topic replication and config info plus partition replication data. Seems logical to split TMR to - ClusterMetadata (brokers + controller, maybe smth else) - TopicMetadata (topic info + partition details) But since current TMR is involved in lots of places (including network client, as I understand) this might be very serious change and it probably makes sense to stick with current approach. Thanks, Andrii Biletskyi On Tue, Mar 17, 2015 at 5:29 PM, Joel Koshy jjkosh...@gmail.com wrote: I may be missing some context but hopefully this will also be covered today: I thought the earlier proposal where there was an explicit ClusterMetadata request was clearer and explicit. During the course of this thread I think the conclusion was that the main need was for controller information and that can be rolled into the topic metadata response but that seems a bit irrelevant to topic metadata. FWIW I think the full broker-list is also irrelevant to
[jira] [Commented] (KAFKA-1481) Stop using dashes AND underscores as separators in MBean names
[ https://issues.apache.org/jira/browse/KAFKA-1481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367878#comment-14367878 ] Guozhang Wang commented on KAFKA-1481: -- We hit an issue related to this ticket, which adds the brokerHost / brokerPort into FetchRequestAndResponseMetrics. The root cause is that when server starts up, it gets local host string by calling: {code} InetAddress.getLocalHost.getCanonicalHostName {code} which, will possibly just return the textual representation of the IP address if somehow accessing local hostname is not allowed: http://docs.oracle.com/javase/7/docs/api/java/net/InetAddress.html#getCanonicalHostName%28%29 In our case, the IPV6 address string is returned, which is registered in ZK, read by controller and propogated to brokers through metadata update, and eventually read by consumers. And when that happens, we got the following error: {code} 2015-03-18 09:46:30 JmxReporter [WARN] Error processing kafka.consumer:type=FetchRequestAndResponseMetrics,name=FetchRequestRateAndTimeMs,clientId=samza_checkpoint_manager-wikipedia_parser-1-1426697189628-0,brokerHost=fe80:0:0:0:7ed1:c3ff:fee0:c60f%4,brokerPort=9092 javax.management.MalformedObjectNameException: Invalid character ':' in value part of property at javax.management.ObjectName.construct(ObjectName.java:618) at javax.management.ObjectName.init(ObjectName.java:1382) at com.yammer.metrics.reporting.JmxReporter.onMetricAdded(JmxReporter.java:395) at com.yammer.metrics.core.MetricsRegistry.notifyMetricAdded(MetricsRegistry.java:516) at com.yammer.metrics.core.MetricsRegistry.getOrAdd(MetricsRegistry.java:491) at com.yammer.metrics.core.MetricsRegistry.newTimer(MetricsRegistry.java:320) at kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:85) at kafka.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndResponseStats.scala:26) at kafka.consumer.FetchRequestAndResponseMetrics.init(FetchRequestAndResponseStats.scala:35) at kafka.consumer.FetchRequestAndResponseStats$$anonfun$1.apply(FetchRequestAndResponseStats.scala:44) at kafka.consumer.FetchRequestAndResponseStats$$anonfun$1.apply(FetchRequestAndResponseStats.scala:44) at kafka.utils.Pool.getAndMaybePut(Pool.scala:61) at kafka.consumer.FetchRequestAndResponseStats.getFetchRequestAndResponseStats(FetchRequestAndResponseStats.scala:51) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:108) at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:283) at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager$$anonfun$readLog$1.apply(KafkaCheckpointManager.scala:256) at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:82) at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readLog(KafkaCheckpointManager.scala:255) at org.apache.samza.checkpoint.kafka.KafkaCheckpointManager.readChangeLogPartitionMapping(KafkaCheckpointManager.scala:242) at org.apache.samza.coordinator.JobCoordinator$.buildJobModel(JobCoordinator.scala:136) at org.apache.samza.coordinator.JobCoordinator.buildJobModel(JobCoordinator.scala) at org.apache.samza.job.standalone.controller.StandaloneZkCoordinatorController.refreshOwnership(StandaloneZkCoordinatorController.java:161) at org.apache.samza.job.standalone.controller.StandaloneZkCoordinatorController.access$900(StandaloneZkCoordinatorController.java:49) at org.apache.samza.job.standalone.controller.StandaloneZkCoordinatorController$ContainerPathListener.handleChildChange(StandaloneZkCoordinatorController.java:256) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) {code} I think the right solution here is that BrokerHost string should also be canonized before adding to sensor tags. [~vladimir.tretyakov] [~junrao] what do you think? Stop using dashes AND underscores as separators in MBean names -- Key: KAFKA-1481 URL: https://issues.apache.org/jira/browse/KAFKA-1481 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Otis Gospodnetic Assignee: Vladimir Tretyakov Priority: Critical Labels: patch Fix For: 0.8.2.0, 0.8.3 Attachments: KAFKA-1481_2014-06-06_13-06-35.patch, KAFKA-1481_2014-10-13_18-23-35.patch, KAFKA-1481_2014-10-14_21-53-35.patch, KAFKA-1481_2014-10-15_10-23-35.patch, KAFKA-1481_2014-10-20_23-14-35.patch, KAFKA-1481_2014-10-21_09-14-35.patch, KAFKA-1481_2014-10-30_21-35-43.patch, KAFKA-1481_2014-10-31_14-35-43.patch, KAFKA-1481_2014-11-03_16-39-41_doc.patch, KAFKA-1481_2014-11-03_17-02-23.patch, KAFKA-1481_2014-11-10_20-39-41_doc.patch, KAFKA-1481_2014-11-10_21-02-23.patch,
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Joel, I'm totally behind your arguments concerning adding irrelevant stuff to TopicMetadataRequest. And also about having a bloated request. Personally I'd go with a separate ClusterMetadataRequest (CMR), actually this was our initial proposal. But since the second part of the request - brokers is already present in TopicMetadataResponse (TMR) I agreed to augment TMR instead of introducing a separate request. The only thing which should be considered though is kafka producer / consumer. If we split TMR to topic metadata and cluster metadata (brokers + controller) we need to think whether it's okay if clients would have to issue two separate requests to maintain Metadata.java (in terms of potential concurrency issue). Can someone please clarify this question? Thanks, Andrii Biletskyi On Wed, Mar 18, 2015 at 8:58 PM, Joel Koshy jjkosh...@gmail.com wrote: (Thanks Andrii for the summary) For (1) yes we will circle back on that shortly after syncing up in person. I think it is close to getting committed although development for KAFKA-1927 can probably begin without it. There is one more item we covered at the hangout. i.e., whether we want to add the coordinator to the topic metadata response or provide a clearer ClusterMetadataRequest. There are two reasons I think we should try and avoid adding the field: - It is irrelevant to topic metadata - If we finally do request rerouting in Kafka then the field would add little to no value. (It still helps to have a separate ClusterMetadataRequest to query for cluster-wide information such as 'which broker is the controller?' as Joe mentioned.) I think it would be cleaner to have an explicit ClusterMetadataRequest that you can send to any broker in order to obtain the controller (and in the future possibly other cluster-wide information). I think the main argument against doing this and instead adding it to the topic metadata response was convenience - i.e., you don't have to discover the controller in advance. However, I don't see much actual benefit/convenience in this and in fact think it is a non-issue. Let me know if I'm overlooking something here. As an example, say we need to initiate partition reassignment by issuing the new ReassignPartitionsRequest to the controller (assume we already have the desired manual partition assignment). If we are to augment topic metadata response then the flow be something like this : - Issue topic metadata request to any broker (and discover the controller - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request to the controller. With an explicit cluster metadata request it would be: - Issue cluster metadata request to any broker - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request So it seems to add little practical value and bloats topic metadata response with an irrelevant detail. The other angle to this is the following - is it a matter of naming? Should we just rename topic metadata request/response to just MetadataRequest/Response and add cluster metadata to it? By that same token should we also allow querying for the consumer coordinator (and in future transaction coordinator) as well? This leads to a bloated request which isn't very appealing and altogether confusing. Thanks, Joel On Wed, Mar 18, 2015 at 09:34:12AM -0700, Jun Rao wrote: Andri, Thanks for the summary. 1. I just realized that in order to start working on KAFKA-1927, we will need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk. This is planned to be done as part of KAFKA-1634. So, we will need Guozhang and Joel's help to wrap this up. 2. Thinking about this a bit more, if the semantic of those write requests is async (i.e., after the client gets a response, it just means that the operation is initiated, but not necessarily completed), we don't really need to forward the requests to the controller. Instead, the receiving broker can just write the operation to ZK as the admin command line tool previously does. This will simplify the implementation. 8. There is another implementation detail for describe topic. Ideally, we want to read the topic config from the broker cache, instead of ZooKeeper. Currently, every broker reads the topic-level config for all topics. However, it ignores those for topics not hosted on itself. So, we may need to change TopicConfigManager a bit so that it caches the configs for all topics. Thanks, Jun On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, Thanks for a great discussion! Here are the actions points: 1. Q: Get rid of all scala requests objects, use java protocol definitions. A: Gwen kindly took that (KAFKA-1927). It's important to speed up review procedure
[jira] [Created] (KAFKA-2029) Improving controlled shutdown for rolling updates
Dmitry Bugaychenko created KAFKA-2029: - Summary: Improving controlled shutdown for rolling updates Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). The problems are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) = // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error(Timed out trying to send message to broker + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // Controller to broker state change requests batch is not empty while creating a new one //throw new IllegalStateException(Timed out trying to send message to broker + brokerId.toString) } case None = warn(Not sending request %s to broker %d, since it is offline..format(request, brokerId)) } } } {code} # When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to exit and rebalance thread wait for lock). *Proposed solution: use bounded wait in rebalance thread*. KafkaController.scala: {code} // ODKL Patch to prevent deadlocks in shutdown. /** * Execute the given function inside the lock */ def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = { if (isRunning || lock.isHeldByCurrentThread) { // TODO: Configure timeout. if (!lock.tryLock(10, TimeUnit.SECONDS)) { throw new IllegalStateException(Failed to acquire controller lock in 10 seconds.); } try { return fun } finally { lock.unlock() } } else { throw new IllegalStateException(Controller is not running, not allowed to lock.) } } private def checkAndTriggerPartitionRebalance(): Unit = { // Use inLockIfRunning here instead of inLock } {code} # Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector act in a way that they prefer the oldes replica in ISR (the one that joined the ISR first). In case of rolling update it means moving partitions to the tail which
[jira] [Updated] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Bugaychenko updated KAFKA-2029: -- Description: Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) = // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error(Timed out trying to send message to broker + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // Controller to broker state change requests batch is not empty while creating a new one //throw new IllegalStateException(Timed out trying to send message to broker + brokerId.toString) } case None = warn(Not sending request %s to broker %d, since it is offline..format(request, brokerId)) } } } {code} # When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to exit and rebalance thread wait for lock). *Proposed solution: use bounded wait in rebalance thread*. KafkaController.scala: {code} // ODKL Patch to prevent deadlocks in shutdown. /** * Execute the given function inside the lock */ def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = { if (isRunning || lock.isHeldByCurrentThread) { // TODO: Configure timeout. if (!lock.tryLock(10, TimeUnit.SECONDS)) { throw new IllegalStateException(Failed to acquire controller lock in 10 seconds.); } try { return fun } finally { lock.unlock() } } else { throw new IllegalStateException(Controller is not running, not allowed to lock.) } } private def checkAndTriggerPartitionRebalance(): Unit = { // Use inLockIfRunning here instead of inLock } {code} # Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector act in a way that they prefer the oldes replica in ISR (the one that joined the ISR first). In case of rolling update it means moving partitions to the tail which increases the overal amount of movements and finally
[jira] [Updated] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Bugaychenko updated KAFKA-2029: -- Description: Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) = // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error(Timed out trying to send message to broker + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // Controller to broker state change requests batch is not empty while creating a new one //throw new IllegalStateException(Timed out trying to send message to broker + brokerId.toString) } case None = warn(Not sending request %s to broker %d, since it is offline..format(request, brokerId)) } } } {code} # When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to exit and rebalance thread wait for lock). *Proposed solution: use bounded wait in rebalance thread*. KafkaController.scala: {code} // ODKL Patch to prevent deadlocks in shutdown. /** * Execute the given function inside the lock */ def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = { if (isRunning || lock.isHeldByCurrentThread) { // TODO: Configure timeout. if (!lock.tryLock(10, TimeUnit.SECONDS)) { throw new IllegalStateException(Failed to acquire controller lock in 10 seconds.); } try { return fun } finally { lock.unlock() } } else { throw new IllegalStateException(Controller is not running, not allowed to lock.) } } private def checkAndTriggerPartitionRebalance(): Unit = { // Use inLockIfRunning here instead of inLock } {code} # Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector act in a way that they prefer the oldes replica in ISR (the one that joined the ISR first). In case of rolling update it means moving partitions to the tail which increases the overal amount of
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
I'm +1 on Jun's suggestion as long as it can work for all the requests. On Wed, Mar 18, 2015 at 3:35 PM, Jun Rao j...@confluent.io wrote: Andrii, I think we agreed on the following. (a) Admin requests can be sent to and handled by any broker. (b) Admin requests are processed asynchronously, at least for now. That is, when the client gets a response, it just means that the request is initiated, but not necessarily completed. Then, it's up to the client to issue another request to check the status for completion. To support (a), we were thinking of doing request forwarding to the controller (utilizing KAFKA-1912). I am making an alternative proposal. Basically, the broker can just write to ZooKeeper to inform the controller about the request. For example, to handle partitionReassignment, the broker will just write the requested partitions to /admin/reassign_partitions (like what AdminUtils currently does) and then send a response to the client. This shouldn't take long and the implementation will be simpler than forwarding the requests to the controller through RPC. Thanks, Jun On Wed, Mar 18, 2015 at 3:03 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jun, I might be wrong but didn't we agree we will let any broker from the cluster handle *long-running* admin requests (at this time preferredReplica and reassignPartitions), via zk admin path. Thus CreateTopics etc should be sent only to the controller. Thanks, Andrii Biletskyi On Wed, Mar 18, 2015 at 11:55 PM, Jun Rao j...@confluent.io wrote: Joel, Andril, I think we agreed that those admin requests can be issued to any broker. Because of that, there doesn't seem to be a strong need to know the controller. So, perhaps we can proceed by not making any change to the format of TMR right now. When we start using create topic request in the producer, we will need a new version of TMR that doesn't trigger auto topic creation. But that can be done later. As a first cut implementation, I think the broker can just write to ZK directly for createToipic/alterTopic/reassignPartitions/preferredLeaderElection requests, instead of forwarding them to the controller. This will simplify the implementation on the broker side. Thanks, Jun On Wed, Mar 18, 2015 at 11:58 AM, Joel Koshy jjkosh...@gmail.com wrote: (Thanks Andrii for the summary) For (1) yes we will circle back on that shortly after syncing up in person. I think it is close to getting committed although development for KAFKA-1927 can probably begin without it. There is one more item we covered at the hangout. i.e., whether we want to add the coordinator to the topic metadata response or provide a clearer ClusterMetadataRequest. There are two reasons I think we should try and avoid adding the field: - It is irrelevant to topic metadata - If we finally do request rerouting in Kafka then the field would add little to no value. (It still helps to have a separate ClusterMetadataRequest to query for cluster-wide information such as 'which broker is the controller?' as Joe mentioned.) I think it would be cleaner to have an explicit ClusterMetadataRequest that you can send to any broker in order to obtain the controller (and in the future possibly other cluster-wide information). I think the main argument against doing this and instead adding it to the topic metadata response was convenience - i.e., you don't have to discover the controller in advance. However, I don't see much actual benefit/convenience in this and in fact think it is a non-issue. Let me know if I'm overlooking something here. As an example, say we need to initiate partition reassignment by issuing the new ReassignPartitionsRequest to the controller (assume we already have the desired manual partition assignment). If we are to augment topic metadata response then the flow be something like this : - Issue topic metadata request to any broker (and discover the controller - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request to the controller. With an explicit cluster metadata request it would be: - Issue cluster metadata request to any broker - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request So it seems to add little practical value and bloats topic metadata response with an irrelevant detail. The other angle to this is the following - is it a matter of naming? Should we just rename topic metadata request/response to just MetadataRequest/Response and add cluster metadata to it? By that same token should we also allow querying for the consumer coordinator (and
Re: [Discussion] Using Client Requests and Responses in Server
We can't rip them out completely, unfortunately - the SimpleConsumer uses them. So we'll need conversion at some point. I'll try to make the conversion point just before hitting a public API that we can't modify, and hopefully it won't look too arbitrary. On Wed, Mar 18, 2015 at 5:24 PM, Jay Kreps jay.kr...@gmail.com wrote: I think either approach is okay in the short term. However our goal should be to eventually get rid of that duplicate code, so if you are up for just ripping and cutting that may get us there sooner. -Jay On Wed, Mar 18, 2015 at 5:19 PM, Gwen Shapira gshap...@cloudera.com wrote: Thanks! Another clarification: The Common request/responses use slightly different infrastructure objects: Node instead of Broker, TopicPartition instead of TopicAndPartition and few more. I can write utilities to convert Node to Broker to minimize the scope of the change. Or I can start replacing Brokers with Nodes across the board. I'm currently taking the second approach - i.e, if MetadataRequest is now returning Node, I'm changing the entire line of dependencies to use Nodes instead of broker. Is this acceptable, or do we want to take a more minimal approach for this patch and do a larger replacement as a follow up? Gwen On Wed, Mar 18, 2015 at 3:32 PM, Jay Kreps jay.kr...@gmail.com wrote: Great. For (3) yeah I think we should just think through the end-to-end pattern for these versioned requests since it seems like we will have a number of them. The serialization code used as you described gets us to the right Struct which the user would then wrap in something like ProduceRequest. Presumably there would just be one ProduceRequest that would internally fill in things like null or otherwise adapt the struct to a usable object. On the response side we would have the version from the request to use for correct versioning. On question is whether this is enough or whether we need to have switches in KafkaApis to do things like if(produceRequest.version == 3) // do something else // do something else Basically it would be good to be able to write a quick wiki that was like how to add or modify a kafka api that explained the right way to do all this. I don't think any of this necessarily blocks this ticket since at the moment we don't have tons of versions of requests out there. -Jay On Wed, Mar 18, 2015 at 2:50 PM, Gwen Shapira gshap...@cloudera.com wrote: See inline responses: On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Gwen, This makes sense to me. A couple of thoughts, mostly confirming what you said I think: 1. Ideally we would move completely over to the new style of request definition for server-side processing, even for the internal requests. This way all requests would have the same header/body struct stuff. As you say for the internal requests we can just delete the scala code. For the old clients they will continue to use their old request definitions until we eol them. I would propose that new changes will go only into the new request/response objects and the old scala ones will be permanently stuck on their current version until discontinued. So after this change that old scala code could be considered frozen. SimpleConsumer is obviously stuck with the old request/response. The Producers can be converted to the common request/response without breaking compatibility. I think we should do this (even though it requires fiddling with additional network serialization code), just so we can throw the old ProduceRequest away. Does that make sense? 2. I think it would be reasonable to keep all the requests under common, even though as you point out there is currently no use for some of them beyond broker-to-broker communication at the moment. Yep. 3. We should think a little about how versioning will work. Making this convenient on the server side is an important goal for the new style of request definition. At the serialization level we now handle versioning but the question we should discuss and work out is how this will map to the request objects (which I assume will remain unversioned). The way I see it working (I just started on this, so I may have gaps): * Request header contains the version * When we read the request, we use ProtoUtils.requestSchema which takes version as a parameter and is responsible to give us the right Schema, which we use to read the buffer and get the correct struct. * KafkaApis handlers have the header, so they can use it to access the correct fields, build the correct response, etc. Does that sound about right? 4. Ideally after this refactoring the network package should not be dependent on the individual request
[jira] [Commented] (KAFKA-2030) Creating a topic cause Unsupported major.minor version 52.0
[ https://issues.apache.org/jira/browse/KAFKA-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14368224#comment-14368224 ] Ewen Cheslack-Postava commented on KAFKA-2030: -- It looks like the homebrew bottle was compiled with Java 8 and you have a different version. The homebrew package isn't maintained here. The packaging script that generates the package is at https://github.com/Homebrew/homebrew/blob/master/Library/Formula/kafka.rb and you'll want to file a bug against that repository. Not sure if homebrew has a build_depends in addition to their depends settings, which is probably how you'd have to enforce something like building with JDK6 to get full compatibility. Not sure which version you're trying to run with, but it looks like they are only supporting 1.7+, whereas Kafka can still be built with JDK6 so even if they fix the bottle to match the formula it's possible it still wouldn't work for you. You might also want to look at: https://github.com/Homebrew/homebrew/blob/master/share/doc/homebrew/Bottles.md You could probably fix this by just turning off bottles. It'll have to build everything from scratch, but should definitely be compatible with whatever Java version you're using. Creating a topic cause Unsupported major.minor version 52.0 --- Key: KAFKA-2030 URL: https://issues.apache.org/jira/browse/KAFKA-2030 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.8.2.0 Environment: OS X 10.9.5, scala 2.11.6, zookeeper 3.4.6_1, java 1.7.0_55 Reporter: Jeff Fogarty Priority: Blocker I installed Kafka via brew. I attempt the following; kafka-topics.sh --zookeeper localhost:2181 --create --topic zerg.hydra --partitions 3 --replication-factor 2 and get Error while executing topic command org/apache/kafka/common/protocol/types/Schema : Unsupported major.minor version 52.0 java.lang.UnsupportedClassVersionError: org/apache/kafka/common/protocol/types/Schema : Unsupported major.minor version 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at kafka.server.OffsetManager$.init(OffsetManager.scala:357) at kafka.server.OffsetManager$.clinit(OffsetManager.scala) at kafka.common.Topic$.init(Topic.scala:29) at kafka.common.Topic$.clinit(Topic.scala) at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172) at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93) at kafka.admin.TopicCommand$.main(TopicCommand.scala:55) at kafka.admin.TopicCommand.main(TopicCommand.scala) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2030) Creating a topic cause Unsupported major.minor version 52.0
[ https://issues.apache.org/jira/browse/KAFKA-2030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-2030. -- Resolution: Invalid Creating a topic cause Unsupported major.minor version 52.0 --- Key: KAFKA-2030 URL: https://issues.apache.org/jira/browse/KAFKA-2030 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.8.2.0 Environment: OS X 10.9.5, scala 2.11.6, zookeeper 3.4.6_1, java 1.7.0_55 Reporter: Jeff Fogarty Priority: Blocker I installed Kafka via brew. I attempt the following; kafka-topics.sh --zookeeper localhost:2181 --create --topic zerg.hydra --partitions 3 --replication-factor 2 and get Error while executing topic command org/apache/kafka/common/protocol/types/Schema : Unsupported major.minor version 52.0 java.lang.UnsupportedClassVersionError: org/apache/kafka/common/protocol/types/Schema : Unsupported major.minor version 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at kafka.server.OffsetManager$.init(OffsetManager.scala:357) at kafka.server.OffsetManager$.clinit(OffsetManager.scala) at kafka.common.Topic$.init(Topic.scala:29) at kafka.common.Topic$.clinit(Topic.scala) at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172) at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93) at kafka.admin.TopicCommand$.main(TopicCommand.scala:55) at kafka.admin.TopicCommand.main(TopicCommand.scala) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] KIP-7 Security - IP Filtering
The proposal sounds reasonable. Timing wise, since we plan to refactor the network layer code in the broker, perhaps this can wait until KAFKA-1928 is done? Thanks, Jun On Tue, Mar 17, 2015 at 6:56 AM, Jeff Holoman jholo...@cloudera.com wrote: bump On Tue, Mar 3, 2015 at 8:12 PM, Jeff Holoman jholo...@cloudera.com wrote: Guozhang, The way the patch is implemented, the check is done in the acceptor thread accept() method of the Socket Server, just before connectionQuotas. Thanks Jeff On Tue, Mar 3, 2015 at 7:59 PM, Guozhang Wang wangg...@gmail.com wrote: Jeff, I am wondering if the IP filtering rule can be enforced at the socket server level instead of the Kafka API level? Guozhang On Tue, Mar 3, 2015 at 2:24 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: +1 (non-binding) On 3/3/15, 1:17 PM, Gwen Shapira gshap...@cloudera.com wrote: +1 (non-binding) On Tue, Mar 3, 2015 at 12:44 PM, Jeff Holoman jholo...@cloudera.com wrote: Details in the wiki. https://cwiki.apache.org/confluence/display/KAFKA/KIP-7+-+Security+-+IP+F iltering -- Jeff Holoman Systems Engineer -- -- Guozhang -- Jeff Holoman Systems Engineer -- Jeff Holoman Systems Engineer
[jira] [Created] (KAFKA-2031) Executing scripts that invoke kafka-run-class.sh results in 'permission denied to create log dir' warning.
Manikandan Narayanaswamy created KAFKA-2031: --- Summary: Executing scripts that invoke kafka-run-class.sh results in 'permission denied to create log dir' warning. Key: KAFKA-2031 URL: https://issues.apache.org/jira/browse/KAFKA-2031 Project: Kafka Issue Type: Bug Components: packaging Affects Versions: 0.8.1.1 Reporter: Manikandan Narayanaswamy Priority: Minor Kafka-run-class.sh script expects LOG_DIR variable to be set, and if this variable is not set, it defaults log dir to $base_dir/logs. And in the event the executor of the script does not have the right permissions, it would lead to errors such as: {noformat} mkdir: cannot create directory `/usr/lib/kafka/bin/../logs': Permission denied. {noformat} Proposing one way to make this more configurable is by introducing kafka-env.sh. Sourcing this file would export LOG_DIR and potentially other variables if need be. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Discussion] Using Client Requests and Responses in Server
I think either approach is okay in the short term. However our goal should be to eventually get rid of that duplicate code, so if you are up for just ripping and cutting that may get us there sooner. -Jay On Wed, Mar 18, 2015 at 5:19 PM, Gwen Shapira gshap...@cloudera.com wrote: Thanks! Another clarification: The Common request/responses use slightly different infrastructure objects: Node instead of Broker, TopicPartition instead of TopicAndPartition and few more. I can write utilities to convert Node to Broker to minimize the scope of the change. Or I can start replacing Brokers with Nodes across the board. I'm currently taking the second approach - i.e, if MetadataRequest is now returning Node, I'm changing the entire line of dependencies to use Nodes instead of broker. Is this acceptable, or do we want to take a more minimal approach for this patch and do a larger replacement as a follow up? Gwen On Wed, Mar 18, 2015 at 3:32 PM, Jay Kreps jay.kr...@gmail.com wrote: Great. For (3) yeah I think we should just think through the end-to-end pattern for these versioned requests since it seems like we will have a number of them. The serialization code used as you described gets us to the right Struct which the user would then wrap in something like ProduceRequest. Presumably there would just be one ProduceRequest that would internally fill in things like null or otherwise adapt the struct to a usable object. On the response side we would have the version from the request to use for correct versioning. On question is whether this is enough or whether we need to have switches in KafkaApis to do things like if(produceRequest.version == 3) // do something else // do something else Basically it would be good to be able to write a quick wiki that was like how to add or modify a kafka api that explained the right way to do all this. I don't think any of this necessarily blocks this ticket since at the moment we don't have tons of versions of requests out there. -Jay On Wed, Mar 18, 2015 at 2:50 PM, Gwen Shapira gshap...@cloudera.com wrote: See inline responses: On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Gwen, This makes sense to me. A couple of thoughts, mostly confirming what you said I think: 1. Ideally we would move completely over to the new style of request definition for server-side processing, even for the internal requests. This way all requests would have the same header/body struct stuff. As you say for the internal requests we can just delete the scala code. For the old clients they will continue to use their old request definitions until we eol them. I would propose that new changes will go only into the new request/response objects and the old scala ones will be permanently stuck on their current version until discontinued. So after this change that old scala code could be considered frozen. SimpleConsumer is obviously stuck with the old request/response. The Producers can be converted to the common request/response without breaking compatibility. I think we should do this (even though it requires fiddling with additional network serialization code), just so we can throw the old ProduceRequest away. Does that make sense? 2. I think it would be reasonable to keep all the requests under common, even though as you point out there is currently no use for some of them beyond broker-to-broker communication at the moment. Yep. 3. We should think a little about how versioning will work. Making this convenient on the server side is an important goal for the new style of request definition. At the serialization level we now handle versioning but the question we should discuss and work out is how this will map to the request objects (which I assume will remain unversioned). The way I see it working (I just started on this, so I may have gaps): * Request header contains the version * When we read the request, we use ProtoUtils.requestSchema which takes version as a parameter and is responsible to give us the right Schema, which we use to read the buffer and get the correct struct. * KafkaApis handlers have the header, so they can use it to access the correct fields, build the correct response, etc. Does that sound about right? 4. Ideally after this refactoring the network package should not be dependent on the individual request objects. The intention is that stuff in kafka.network is meant to be generic network infrastructure that doesn't know about the particular fetch/produce apis we have implemented on top. I'll make a note to validate that this is the case. -Jay On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira
[jira] [Commented] (KAFKA-2031) Executing scripts that invoke kafka-run-class.sh results in 'permission denied to create log dir' warning.
[ https://issues.apache.org/jira/browse/KAFKA-2031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14368285#comment-14368285 ] Manikandan Narayanaswamy commented on KAFKA-2031: - Great! Thanks for resolving this one. Executing scripts that invoke kafka-run-class.sh results in 'permission denied to create log dir' warning. -- Key: KAFKA-2031 URL: https://issues.apache.org/jira/browse/KAFKA-2031 Project: Kafka Issue Type: Bug Components: packaging Affects Versions: 0.8.1.1 Reporter: Manikandan Narayanaswamy Priority: Minor Kafka-run-class.sh script expects LOG_DIR variable to be set, and if this variable is not set, it defaults log dir to $base_dir/logs. And in the event the executor of the script does not have the right permissions, it would lead to errors such as: {noformat} mkdir: cannot create directory `/usr/lib/kafka/bin/../logs': Permission denied. {noformat} Proposing one way to make this more configurable is by introducing kafka-env.sh. Sourcing this file would export LOG_DIR and potentially other variables if need be. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-1684) Implement TLS/SSL authentication
[ https://issues.apache.org/jira/browse/KAFKA-1684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani reassigned KAFKA-1684: - Assignee: Sriharsha Chintalapani (was: Ivan Lyutov) Implement TLS/SSL authentication Key: KAFKA-1684 URL: https://issues.apache.org/jira/browse/KAFKA-1684 Project: Kafka Issue Type: Sub-task Components: security Affects Versions: 0.9.0 Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Attachments: KAFKA-1684.patch, KAFKA-1684.patch Add an SSL port to the configuration and advertise this as part of the metadata request. If the SSL port is configured the socket server will need to add a second Acceptor thread to listen on it. Connections accepted on this port will need to go through the SSL handshake prior to being registered with a Processor for request processing. SSL requests and responses may need to be wrapped or unwrapped using the SSLEngine that was initialized by the acceptor. This wrapping and unwrapping is very similar to what will need to be done for SASL-based authentication schemes. We should have a uniform interface that covers both of these and we will need to store the instance in the session with the request. The socket server will have to use this object when reading and writing requests. We will need to take care with the FetchRequests as the current FileChannel.transferTo mechanism will be incompatible with wrap/unwrap so we can only use this optimization for unencrypted sockets that don't require userspace translation (wrapping). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
RE: [KIP-DISCUSSION] KIP-13 Quotas
Hey everyone, Thanks for the great discussion. There are currently a few points on this KIP that need addressing and I want to make sure we are on the same page about those. 1. Append and delay response vs delay and return error - I think we've discussed the pros and cons of each approach but haven't chosen an approach yet. Where does everyone stand on this issue? 2. Metrics Migration and usage in quotas - The metrics library in clients has a notion of quotas that we should reuse. For that to happen, we need to migrate the server to the new metrics package. - Need more clarification on how to compute throttling time and windowing for quotas. I'm going to start a new KIP to discuss metrics migration separately. That will also contain a section on quotas. 3. Dynamic Configuration management - Being discussed in KIP-5. Basically we need something that will model default quotas and allow per-client overrides. Is there something else that I'm missing? Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Wednesday, March 18, 2015 2:10 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the
Re: [Discussion] Using Client Requests and Responses in Server
Great. For (3) yeah I think we should just think through the end-to-end pattern for these versioned requests since it seems like we will have a number of them. The serialization code used as you described gets us to the right Struct which the user would then wrap in something like ProduceRequest. Presumably there would just be one ProduceRequest that would internally fill in things like null or otherwise adapt the struct to a usable object. On the response side we would have the version from the request to use for correct versioning. On question is whether this is enough or whether we need to have switches in KafkaApis to do things like if(produceRequest.version == 3) // do something else // do something else Basically it would be good to be able to write a quick wiki that was like how to add or modify a kafka api that explained the right way to do all this. I don't think any of this necessarily blocks this ticket since at the moment we don't have tons of versions of requests out there. -Jay On Wed, Mar 18, 2015 at 2:50 PM, Gwen Shapira gshap...@cloudera.com wrote: See inline responses: On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Gwen, This makes sense to me. A couple of thoughts, mostly confirming what you said I think: 1. Ideally we would move completely over to the new style of request definition for server-side processing, even for the internal requests. This way all requests would have the same header/body struct stuff. As you say for the internal requests we can just delete the scala code. For the old clients they will continue to use their old request definitions until we eol them. I would propose that new changes will go only into the new request/response objects and the old scala ones will be permanently stuck on their current version until discontinued. So after this change that old scala code could be considered frozen. SimpleConsumer is obviously stuck with the old request/response. The Producers can be converted to the common request/response without breaking compatibility. I think we should do this (even though it requires fiddling with additional network serialization code), just so we can throw the old ProduceRequest away. Does that make sense? 2. I think it would be reasonable to keep all the requests under common, even though as you point out there is currently no use for some of them beyond broker-to-broker communication at the moment. Yep. 3. We should think a little about how versioning will work. Making this convenient on the server side is an important goal for the new style of request definition. At the serialization level we now handle versioning but the question we should discuss and work out is how this will map to the request objects (which I assume will remain unversioned). The way I see it working (I just started on this, so I may have gaps): * Request header contains the version * When we read the request, we use ProtoUtils.requestSchema which takes version as a parameter and is responsible to give us the right Schema, which we use to read the buffer and get the correct struct. * KafkaApis handlers have the header, so they can use it to access the correct fields, build the correct response, etc. Does that sound about right? 4. Ideally after this refactoring the network package should not be dependent on the individual request objects. The intention is that stuff in kafka.network is meant to be generic network infrastructure that doesn't know about the particular fetch/produce apis we have implemented on top. I'll make a note to validate that this is the case. -Jay On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Jun, I was taking a slightly different approach. Let me know if it makes sense to you: 1. Get the bytes from network (kinda unavoidable...) 2. Modify RequestChannel.Request to contain header and body (instead of a single object) 3. Create the head and body from bytes as follow: val header: RequestHeader = RequestHeader.parse(buffer) val apiKey: Int = header.apiKey val body: Struct = ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] 4. KafkaAPIs will continue getting RequestChannel.Request, but will now have access to body and header separately. I agree that I need a Request/Response objects that contain only the body for all requests objects. I'm thinking of implementing them in o.a.k.Common.Requests in Java for consistency. When we are discussing the requests/responses used in SimpleConsumer, we mean everything used in javaapi, right? Gwen On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote: Hi, Gwen, I was thinking that we will be doing the following in KAFKA-1927. 1. Get the bytes from
Build failed in Jenkins: KafkaPreCommit #40
See https://builds.apache.org/job/KafkaPreCommit/40/changes Changes: [wangguoz] KAFKA-1997; Hopefully last follow-up fix to get messageHandlerArgs right -- [...truncated 3501 lines...] at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.api.test.ProducerCompressionTest.setUp(ProducerCompressionTest.scala:54) java.lang.NullPointerException at kafka.api.test.ProducerCompressionTest.tearDown(ProducerCompressionTest.scala:60) kafka.network.SocketServerTest simpleRequest PASSED kafka.network.SocketServerTest tooBigRequestIsRejected PASSED kafka.network.SocketServerTest testNullResponse PASSED kafka.network.SocketServerTest testSocketsCloseOnShutdown PASSED kafka.network.SocketServerTest testMaxConnectionsPerIp PASSED kafka.network.SocketServerTest testMaxConnectionsPerIPOverrides PASSED kafka.log.OffsetIndexTest truncate PASSED kafka.log.OffsetIndexTest randomLookupTest PASSED kafka.log.OffsetIndexTest lookupExtremeCases PASSED kafka.log.OffsetIndexTest appendTooMany PASSED kafka.log.OffsetIndexTest appendOutOfOrder PASSED kafka.log.OffsetIndexTest testReopen PASSED kafka.log.OffsetMapTest testBasicValidation PASSED kafka.log.OffsetMapTest testClear PASSED kafka.log.CleanerTest testCleanSegments PASSED kafka.log.CleanerTest testCleaningWithDeletes PASSED kafka.log.CleanerTest testCleaningWithUnkeyedMessages PASSED kafka.log.CleanerTest testCleanSegmentsWithAbort PASSED kafka.log.CleanerTest testSegmentGrouping PASSED kafka.log.CleanerTest testBuildOffsetMap PASSED kafka.log.FileMessageSetTest testWrittenEqualsRead PASSED kafka.log.FileMessageSetTest testIteratorIsConsistent PASSED kafka.log.FileMessageSetTest testSizeInBytes PASSED kafka.log.FileMessageSetTest testWriteTo PASSED kafka.log.FileMessageSetTest testFileSize PASSED kafka.log.FileMessageSetTest testIterationOverPartialAndTruncation PASSED kafka.log.FileMessageSetTest testIterationDoesntChangePosition PASSED kafka.log.FileMessageSetTest testRead PASSED kafka.log.FileMessageSetTest testSearch PASSED kafka.log.FileMessageSetTest testIteratorWithLimits PASSED kafka.log.FileMessageSetTest testTruncate PASSED kafka.log.LogTest testTimeBasedLogRoll PASSED kafka.log.LogTest testTimeBasedLogRollJitter PASSED kafka.log.LogTest testSizeBasedLogRoll PASSED kafka.log.LogTest testLoadEmptyLog PASSED kafka.log.LogTest testAppendAndReadWithSequentialOffsets PASSED kafka.log.LogTest testAppendAndReadWithNonSequentialOffsets PASSED kafka.log.LogTest testReadAtLogGap PASSED kafka.log.LogTest testReadOutOfRange PASSED kafka.log.LogTest testLogRolls PASSED kafka.log.LogTest testCompressedMessages PASSED kafka.log.LogTest testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED kafka.log.LogTest testMessageSetSizeCheck PASSED kafka.log.LogTest testCompactedTopicConstraints PASSED kafka.log.LogTest testMessageSizeCheck PASSED kafka.log.LogTest testLogRecoversToCorrectOffset PASSED kafka.log.LogTest testIndexRebuild PASSED kafka.log.LogTest testTruncateTo PASSED kafka.log.LogTest testIndexResizingAtTruncation PASSED kafka.log.LogTest testBogusIndexSegmentsAreRemoved PASSED kafka.log.LogTest testReopenThenTruncate PASSED kafka.log.LogTest testAsyncDelete PASSED kafka.log.LogTest testOpenDeletesObsoleteFiles PASSED kafka.log.LogTest testAppendMessageWithNullPayload PASSED kafka.log.LogTest testCorruptLog PASSED kafka.log.LogTest testCleanShutdownFile PASSED kafka.log.LogTest testParseTopicPartitionName PASSED kafka.log.LogTest testParseTopicPartitionNameForEmptyName PASSED kafka.log.LogTest testParseTopicPartitionNameForNull PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingSeparator PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingTopic PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingPartition PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[0] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[1] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[2] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[3] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[4] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[5] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[6] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[7] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[8] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[9] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[10] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[11] PASSED
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Andrii, I think we agreed on the following. (a) Admin requests can be sent to and handled by any broker. (b) Admin requests are processed asynchronously, at least for now. That is, when the client gets a response, it just means that the request is initiated, but not necessarily completed. Then, it's up to the client to issue another request to check the status for completion. To support (a), we were thinking of doing request forwarding to the controller (utilizing KAFKA-1912). I am making an alternative proposal. Basically, the broker can just write to ZooKeeper to inform the controller about the request. For example, to handle partitionReassignment, the broker will just write the requested partitions to /admin/reassign_partitions (like what AdminUtils currently does) and then send a response to the client. This shouldn't take long and the implementation will be simpler than forwarding the requests to the controller through RPC. Thanks, Jun On Wed, Mar 18, 2015 at 3:03 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jun, I might be wrong but didn't we agree we will let any broker from the cluster handle *long-running* admin requests (at this time preferredReplica and reassignPartitions), via zk admin path. Thus CreateTopics etc should be sent only to the controller. Thanks, Andrii Biletskyi On Wed, Mar 18, 2015 at 11:55 PM, Jun Rao j...@confluent.io wrote: Joel, Andril, I think we agreed that those admin requests can be issued to any broker. Because of that, there doesn't seem to be a strong need to know the controller. So, perhaps we can proceed by not making any change to the format of TMR right now. When we start using create topic request in the producer, we will need a new version of TMR that doesn't trigger auto topic creation. But that can be done later. As a first cut implementation, I think the broker can just write to ZK directly for createToipic/alterTopic/reassignPartitions/preferredLeaderElection requests, instead of forwarding them to the controller. This will simplify the implementation on the broker side. Thanks, Jun On Wed, Mar 18, 2015 at 11:58 AM, Joel Koshy jjkosh...@gmail.com wrote: (Thanks Andrii for the summary) For (1) yes we will circle back on that shortly after syncing up in person. I think it is close to getting committed although development for KAFKA-1927 can probably begin without it. There is one more item we covered at the hangout. i.e., whether we want to add the coordinator to the topic metadata response or provide a clearer ClusterMetadataRequest. There are two reasons I think we should try and avoid adding the field: - It is irrelevant to topic metadata - If we finally do request rerouting in Kafka then the field would add little to no value. (It still helps to have a separate ClusterMetadataRequest to query for cluster-wide information such as 'which broker is the controller?' as Joe mentioned.) I think it would be cleaner to have an explicit ClusterMetadataRequest that you can send to any broker in order to obtain the controller (and in the future possibly other cluster-wide information). I think the main argument against doing this and instead adding it to the topic metadata response was convenience - i.e., you don't have to discover the controller in advance. However, I don't see much actual benefit/convenience in this and in fact think it is a non-issue. Let me know if I'm overlooking something here. As an example, say we need to initiate partition reassignment by issuing the new ReassignPartitionsRequest to the controller (assume we already have the desired manual partition assignment). If we are to augment topic metadata response then the flow be something like this : - Issue topic metadata request to any broker (and discover the controller - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request to the controller. With an explicit cluster metadata request it would be: - Issue cluster metadata request to any broker - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request So it seems to add little practical value and bloats topic metadata response with an irrelevant detail. The other angle to this is the following - is it a matter of naming? Should we just rename topic metadata request/response to just MetadataRequest/Response and add cluster metadata to it? By that same token should we also allow querying for the consumer coordinator (and in future transaction coordinator) as well? This leads to a bloated request which isn't very appealing and altogether confusing. Thanks, Joel On Wed, Mar 18, 2015 at 09:34:12AM -0700, Jun Rao wrote: Andri, Thanks for the
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
+1 as well. I think it helps to keep the rerouting approach orthogonal to this KIP. On Wed, Mar 18, 2015 at 03:40:48PM -0700, Jay Kreps wrote: I'm +1 on Jun's suggestion as long as it can work for all the requests. On Wed, Mar 18, 2015 at 3:35 PM, Jun Rao j...@confluent.io wrote: Andrii, I think we agreed on the following. (a) Admin requests can be sent to and handled by any broker. (b) Admin requests are processed asynchronously, at least for now. That is, when the client gets a response, it just means that the request is initiated, but not necessarily completed. Then, it's up to the client to issue another request to check the status for completion. To support (a), we were thinking of doing request forwarding to the controller (utilizing KAFKA-1912). I am making an alternative proposal. Basically, the broker can just write to ZooKeeper to inform the controller about the request. For example, to handle partitionReassignment, the broker will just write the requested partitions to /admin/reassign_partitions (like what AdminUtils currently does) and then send a response to the client. This shouldn't take long and the implementation will be simpler than forwarding the requests to the controller through RPC. Thanks, Jun On Wed, Mar 18, 2015 at 3:03 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Jun, I might be wrong but didn't we agree we will let any broker from the cluster handle *long-running* admin requests (at this time preferredReplica and reassignPartitions), via zk admin path. Thus CreateTopics etc should be sent only to the controller. Thanks, Andrii Biletskyi On Wed, Mar 18, 2015 at 11:55 PM, Jun Rao j...@confluent.io wrote: Joel, Andril, I think we agreed that those admin requests can be issued to any broker. Because of that, there doesn't seem to be a strong need to know the controller. So, perhaps we can proceed by not making any change to the format of TMR right now. When we start using create topic request in the producer, we will need a new version of TMR that doesn't trigger auto topic creation. But that can be done later. As a first cut implementation, I think the broker can just write to ZK directly for createToipic/alterTopic/reassignPartitions/preferredLeaderElection requests, instead of forwarding them to the controller. This will simplify the implementation on the broker side. Thanks, Jun On Wed, Mar 18, 2015 at 11:58 AM, Joel Koshy jjkosh...@gmail.com wrote: (Thanks Andrii for the summary) For (1) yes we will circle back on that shortly after syncing up in person. I think it is close to getting committed although development for KAFKA-1927 can probably begin without it. There is one more item we covered at the hangout. i.e., whether we want to add the coordinator to the topic metadata response or provide a clearer ClusterMetadataRequest. There are two reasons I think we should try and avoid adding the field: - It is irrelevant to topic metadata - If we finally do request rerouting in Kafka then the field would add little to no value. (It still helps to have a separate ClusterMetadataRequest to query for cluster-wide information such as 'which broker is the controller?' as Joe mentioned.) I think it would be cleaner to have an explicit ClusterMetadataRequest that you can send to any broker in order to obtain the controller (and in the future possibly other cluster-wide information). I think the main argument against doing this and instead adding it to the topic metadata response was convenience - i.e., you don't have to discover the controller in advance. However, I don't see much actual benefit/convenience in this and in fact think it is a non-issue. Let me know if I'm overlooking something here. As an example, say we need to initiate partition reassignment by issuing the new ReassignPartitionsRequest to the controller (assume we already have the desired manual partition assignment). If we are to augment topic metadata response then the flow be something like this : - Issue topic metadata request to any broker (and discover the controller - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request to the controller. With an explicit cluster metadata request it would be: - Issue cluster metadata request to any broker - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request So it seems to add little practical value and bloats topic metadata response with an irrelevant detail. The other angle
[jira] [Resolved] (KAFKA-2031) Executing scripts that invoke kafka-run-class.sh results in 'permission denied to create log dir' warning.
[ https://issues.apache.org/jira/browse/KAFKA-2031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani resolved KAFKA-2031. --- Resolution: Duplicate Executing scripts that invoke kafka-run-class.sh results in 'permission denied to create log dir' warning. -- Key: KAFKA-2031 URL: https://issues.apache.org/jira/browse/KAFKA-2031 Project: Kafka Issue Type: Bug Components: packaging Affects Versions: 0.8.1.1 Reporter: Manikandan Narayanaswamy Priority: Minor Kafka-run-class.sh script expects LOG_DIR variable to be set, and if this variable is not set, it defaults log dir to $base_dir/logs. And in the event the executor of the script does not have the right permissions, it would lead to errors such as: {noformat} mkdir: cannot create directory `/usr/lib/kafka/bin/../logs': Permission denied. {noformat} Proposing one way to make this more configurable is by introducing kafka-env.sh. Sourcing this file would export LOG_DIR and potentially other variables if need be. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Yes that is what I was alluding to when I said that if we finally do request rerouting in Kafka then the field would add little to no value. I wasn't sure if we agreed that we _will_ do rerouting or whether we agreed to evaluate it (KAFKA-1912). Andrii can you update the KIP with this? Thanks, Joel On Wed, Mar 18, 2015 at 02:55:00PM -0700, Jun Rao wrote: Joel, Andril, I think we agreed that those admin requests can be issued to any broker. Because of that, there doesn't seem to be a strong need to know the controller. So, perhaps we can proceed by not making any change to the format of TMR right now. When we start using create topic request in the producer, we will need a new version of TMR that doesn't trigger auto topic creation. But that can be done later. As a first cut implementation, I think the broker can just write to ZK directly for createToipic/alterTopic/reassignPartitions/preferredLeaderElection requests, instead of forwarding them to the controller. This will simplify the implementation on the broker side. Thanks, Jun On Wed, Mar 18, 2015 at 11:58 AM, Joel Koshy jjkosh...@gmail.com wrote: (Thanks Andrii for the summary) For (1) yes we will circle back on that shortly after syncing up in person. I think it is close to getting committed although development for KAFKA-1927 can probably begin without it. There is one more item we covered at the hangout. i.e., whether we want to add the coordinator to the topic metadata response or provide a clearer ClusterMetadataRequest. There are two reasons I think we should try and avoid adding the field: - It is irrelevant to topic metadata - If we finally do request rerouting in Kafka then the field would add little to no value. (It still helps to have a separate ClusterMetadataRequest to query for cluster-wide information such as 'which broker is the controller?' as Joe mentioned.) I think it would be cleaner to have an explicit ClusterMetadataRequest that you can send to any broker in order to obtain the controller (and in the future possibly other cluster-wide information). I think the main argument against doing this and instead adding it to the topic metadata response was convenience - i.e., you don't have to discover the controller in advance. However, I don't see much actual benefit/convenience in this and in fact think it is a non-issue. Let me know if I'm overlooking something here. As an example, say we need to initiate partition reassignment by issuing the new ReassignPartitionsRequest to the controller (assume we already have the desired manual partition assignment). If we are to augment topic metadata response then the flow be something like this : - Issue topic metadata request to any broker (and discover the controller - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request to the controller. With an explicit cluster metadata request it would be: - Issue cluster metadata request to any broker - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request So it seems to add little practical value and bloats topic metadata response with an irrelevant detail. The other angle to this is the following - is it a matter of naming? Should we just rename topic metadata request/response to just MetadataRequest/Response and add cluster metadata to it? By that same token should we also allow querying for the consumer coordinator (and in future transaction coordinator) as well? This leads to a bloated request which isn't very appealing and altogether confusing. Thanks, Joel On Wed, Mar 18, 2015 at 09:34:12AM -0700, Jun Rao wrote: Andri, Thanks for the summary. 1. I just realized that in order to start working on KAFKA-1927, we will need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk. This is planned to be done as part of KAFKA-1634. So, we will need Guozhang and Joel's help to wrap this up. 2. Thinking about this a bit more, if the semantic of those write requests is async (i.e., after the client gets a response, it just means that the operation is initiated, but not necessarily completed), we don't really need to forward the requests to the controller. Instead, the receiving broker can just write the operation to ZK as the admin command line tool previously does. This will simplify the implementation. 8. There is another implementation detail for describe topic. Ideally, we want to read the topic config from the broker cache, instead of ZooKeeper. Currently, every broker reads the topic-level config for all topics. However, it ignores those for topics not hosted on itself. So, we may need to change TopicConfigManager a bit so that it caches the configs for all topics. Thanks,
[jira] [Created] (KAFKA-2030) Creating a topic cause Unsupported major.minor version 52.0
Jeff Fogarty created KAFKA-2030: --- Summary: Creating a topic cause Unsupported major.minor version 52.0 Key: KAFKA-2030 URL: https://issues.apache.org/jira/browse/KAFKA-2030 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.8.2.0 Environment: OS X 10.9.5, scala 2.11.6, zookeeper 3.4.6_1, java 1.7.0_55 Reporter: Jeff Fogarty Priority: Blocker I installed Kafka via brew. I attempt the following; kafka-topics.sh --zookeeper localhost:2181 --create --topic zerg.hydra --partitions 3 --replication-factor 2 and get Error while executing topic command org/apache/kafka/common/protocol/types/Schema : Unsupported major.minor version 52.0 java.lang.UnsupportedClassVersionError: org/apache/kafka/common/protocol/types/Schema : Unsupported major.minor version 52.0 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at kafka.server.OffsetManager$.init(OffsetManager.scala:357) at kafka.server.OffsetManager$.clinit(OffsetManager.scala) at kafka.common.Topic$.init(Topic.scala:29) at kafka.common.Topic$.clinit(Topic.scala) at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172) at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93) at kafka.admin.TopicCommand$.main(TopicCommand.scala:55) at kafka.admin.TopicCommand.main(TopicCommand.scala) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [KIP-DISCUSSION] KIP-13 Quotas
I think that is a good summary. 1. I'd favor delay over error--I think we have a design where delay will basically mimic the same behavior you would get if you had a lower capacity Kafka cluster all to yourself, which from my point of view is ideal. I'm aesthetically opposed to delay+error. 2. I actually don't think this is blocked on completing the metrics migration. I do think we need to figure out the direction, though to decide what to do. 3. Yup. -Jay On Wed, Mar 18, 2015 at 4:02 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey everyone, Thanks for the great discussion. There are currently a few points on this KIP that need addressing and I want to make sure we are on the same page about those. 1. Append and delay response vs delay and return error - I think we've discussed the pros and cons of each approach but haven't chosen an approach yet. Where does everyone stand on this issue? 2. Metrics Migration and usage in quotas - The metrics library in clients has a notion of quotas that we should reuse. For that to happen, we need to migrate the server to the new metrics package. - Need more clarification on how to compute throttling time and windowing for quotas. I'm going to start a new KIP to discuss metrics migration separately. That will also contain a section on quotas. 3. Dynamic Configuration management - Being discussed in KIP-5. Basically we need something that will model default quotas and allow per-client overrides. Is there something else that I'm missing? Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Wednesday, March 18, 2015 2:10 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a)
Re: [Discussion] Using Client Requests and Responses in Server
Thanks! Another clarification: The Common request/responses use slightly different infrastructure objects: Node instead of Broker, TopicPartition instead of TopicAndPartition and few more. I can write utilities to convert Node to Broker to minimize the scope of the change. Or I can start replacing Brokers with Nodes across the board. I'm currently taking the second approach - i.e, if MetadataRequest is now returning Node, I'm changing the entire line of dependencies to use Nodes instead of broker. Is this acceptable, or do we want to take a more minimal approach for this patch and do a larger replacement as a follow up? Gwen On Wed, Mar 18, 2015 at 3:32 PM, Jay Kreps jay.kr...@gmail.com wrote: Great. For (3) yeah I think we should just think through the end-to-end pattern for these versioned requests since it seems like we will have a number of them. The serialization code used as you described gets us to the right Struct which the user would then wrap in something like ProduceRequest. Presumably there would just be one ProduceRequest that would internally fill in things like null or otherwise adapt the struct to a usable object. On the response side we would have the version from the request to use for correct versioning. On question is whether this is enough or whether we need to have switches in KafkaApis to do things like if(produceRequest.version == 3) // do something else // do something else Basically it would be good to be able to write a quick wiki that was like how to add or modify a kafka api that explained the right way to do all this. I don't think any of this necessarily blocks this ticket since at the moment we don't have tons of versions of requests out there. -Jay On Wed, Mar 18, 2015 at 2:50 PM, Gwen Shapira gshap...@cloudera.com wrote: See inline responses: On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Gwen, This makes sense to me. A couple of thoughts, mostly confirming what you said I think: 1. Ideally we would move completely over to the new style of request definition for server-side processing, even for the internal requests. This way all requests would have the same header/body struct stuff. As you say for the internal requests we can just delete the scala code. For the old clients they will continue to use their old request definitions until we eol them. I would propose that new changes will go only into the new request/response objects and the old scala ones will be permanently stuck on their current version until discontinued. So after this change that old scala code could be considered frozen. SimpleConsumer is obviously stuck with the old request/response. The Producers can be converted to the common request/response without breaking compatibility. I think we should do this (even though it requires fiddling with additional network serialization code), just so we can throw the old ProduceRequest away. Does that make sense? 2. I think it would be reasonable to keep all the requests under common, even though as you point out there is currently no use for some of them beyond broker-to-broker communication at the moment. Yep. 3. We should think a little about how versioning will work. Making this convenient on the server side is an important goal for the new style of request definition. At the serialization level we now handle versioning but the question we should discuss and work out is how this will map to the request objects (which I assume will remain unversioned). The way I see it working (I just started on this, so I may have gaps): * Request header contains the version * When we read the request, we use ProtoUtils.requestSchema which takes version as a parameter and is responsible to give us the right Schema, which we use to read the buffer and get the correct struct. * KafkaApis handlers have the header, so they can use it to access the correct fields, build the correct response, etc. Does that sound about right? 4. Ideally after this refactoring the network package should not be dependent on the individual request objects. The intention is that stuff in kafka.network is meant to be generic network infrastructure that doesn't know about the particular fetch/produce apis we have implemented on top. I'll make a note to validate that this is the case. -Jay On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Jun, I was taking a slightly different approach. Let me know if it makes sense to you: 1. Get the bytes from network (kinda unavoidable...) 2. Modify RequestChannel.Request to contain header and body (instead of a single object) 3. Create the head and body from bytes as follow: val header: RequestHeader = RequestHeader.parse(buffer) val apiKey: Int =
[jira] [Commented] (KAFKA-2031) Executing scripts that invoke kafka-run-class.sh results in 'permission denied to create log dir' warning.
[ https://issues.apache.org/jira/browse/KAFKA-2031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14368239#comment-14368239 ] Sriharsha Chintalapani commented on KAFKA-2031: --- [~mnarayan] we've a JIRA to add kafka-env.sh here https://issues.apache.org/jira/browse/KAFKA-1566 Executing scripts that invoke kafka-run-class.sh results in 'permission denied to create log dir' warning. -- Key: KAFKA-2031 URL: https://issues.apache.org/jira/browse/KAFKA-2031 Project: Kafka Issue Type: Bug Components: packaging Affects Versions: 0.8.1.1 Reporter: Manikandan Narayanaswamy Priority: Minor Kafka-run-class.sh script expects LOG_DIR variable to be set, and if this variable is not set, it defaults log dir to $base_dir/logs. And in the event the executor of the script does not have the right permissions, it would lead to errors such as: {noformat} mkdir: cannot create directory `/usr/lib/kafka/bin/../logs': Permission denied. {noformat} Proposing one way to make this more configurable is by introducing kafka-env.sh. Sourcing this file would export LOG_DIR and potentially other variables if need be. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31742: Patch for KAFKA-527
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31742/#review76985 --- Thanks for the patch. Could we test out the patch with StressTestLog? core/src/main/scala/kafka/message/ByteBufferMessageSet.scala https://reviews.apache.org/r/31742/#comment124757 Sometimes, the message size can be large. To avoid memory fragmentation, would it be better to cap the segment size to sth like 256K? So the segment size will be min(messageSetSize, 64KB)? core/src/main/scala/kafka/message/ByteBufferMessageSet.scala https://reviews.apache.org/r/31742/#comment124756 Instead of passing in a function, I think it will be easier to understand the logic if we just pass in the message list to messageWriter.write(). This way, all writes are inside messageWriter, instead of in two places. core/src/main/scala/kafka/message/ByteBufferMessageSet.scala https://reviews.apache.org/r/31742/#comment124758 Same comment on the segment size as the above. core/src/main/scala/kafka/message/MessageWriter.scala https://reviews.apache.org/r/31742/#comment124759 For consistency, size() can just be size. core/src/main/scala/kafka/message/MessageWriter.scala https://reviews.apache.org/r/31742/#comment124760 Would it be better to name this ReservedOutput? core/src/main/scala/kafka/message/MessageWriter.scala https://reviews.apache.org/r/31742/#comment124744 Perhaps we can rename off to initOffset and offset to currOffset? core/src/main/scala/kafka/message/MessageWriter.scala https://reviews.apache.org/r/31742/#comment124743 Shouldn't we test off b.length and off + len b.length? core/src/main/scala/kafka/message/MessageWriter.scala https://reviews.apache.org/r/31742/#comment124745 Perhaps it's clearer to write this as a do .. while? It feels a bit weird that the first statement is adding an amount that is initialized to 0. core/src/test/scala/unit/kafka/message/MessageWriterTest.scala https://reviews.apache.org/r/31742/#comment124754 Could we just use Random.nextBytes()? core/src/test/scala/unit/kafka/message/MessageWriterTest.scala https://reviews.apache.org/r/31742/#comment124755 Could we explicitly import the implicits in where it's applied? That way it's clear where it's used. - Jun Rao On March 16, 2015, 10:19 p.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31742/ --- (Updated March 16, 2015, 10:19 p.m.) Review request for kafka. Bugs: KAFKA-527 https://issues.apache.org/jira/browse/KAFKA-527 Repository: kafka Description --- less byte copies Diffs - core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 9c694719dc9b515fb3c3ae96435a87b334044272 core/src/main/scala/kafka/message/MessageWriter.scala PRE-CREATION core/src/test/scala/unit/kafka/message/MessageWriterTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/31742/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Hi Michael, Thanks for taking the time to review. Currently I did not plan on adding “Deny” but I guess it can’t hurt except for adding more constructs would probably make the acls more complex. When a topic is created with no acls provided , I was planning to add a default ACL which would allow access to everyone from all hosts. I am assuming you are referring to principal in Acl and acls were supposed to be provided in property files, stored in zk so I thought it is better to just refer to a string. We will always be using session.principal.getName to get the actual principal name. Thanks Parth On 3/18/15, 2:20 PM, Michael Herstine mherst...@linkedin.com.INVALID wrote: Hi Parth, Thanks! A few questions: 1. Do you want to permit rules in your ACLs that DENY access as well as ALLOW? This can be handy setting up rules that have exceptions. E.g. “Allow principal P to READ resource R from all hosts” with “Deny principal P READ access to resource R from host H1” in combination would allow P to READ R from all hosts *except* H1. 2. When a topic is newly created, will there be an ACL created for it? If not, would that not deny subsequent access to it? (nit) Maybe use Principal instead of String to represent principals? On 3/9/15, 11:48 AM, Don Bosco Durai bo...@apache.org wrote: Parth Overall it is looking good. Couple of questionsŠ - Can you give an example how the policies will look like in the default implementation? - In the operations, can we support ³CONNECT² also? This can be used during Session connection - Regarding access control for ³Topic Creation², since we can¹t do it on the server side, can we de-scope it for? And plan it as a future feature request? Thanks Bosco On 3/6/15, 8:10 AM, Harsha ka...@harsha.io wrote: Hi Parth, Thanks for putting this together. Overall it looks good to me. Although AdminUtils is a concern KIP-4 can probably fix that part. Thanks, Harsha On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote: Forgot to add links to wiki and jira. Link to wiki: https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorizatio n + Interface Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688 Thanks Parth From: Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com Date: Thursday, March 5, 2015 at 10:33 AM To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: [DISCUSS] KIP-11- Authorization design for kafka security Hi, KIP-11 is open for discussion , I have updated the wiki with the design and open questions. Thanks Parth
[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation
[ https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367645#comment-14367645 ] Parth Brahmbhatt commented on KAFKA-1688: - [~junrao][~jkreps] Can you guys comment on the question about TopicConfig Cache? Add authorization interface and naive implementation Key: KAFKA-1688 URL: https://issues.apache.org/jira/browse/KAFKA-1688 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jay Kreps Assignee: Parth Brahmbhatt Fix For: 0.8.3 Add a PermissionManager interface as described here: https://cwiki.apache.org/confluence/display/KAFKA/Security (possibly there is a better name?) Implement calls to the PermissionsManager in KafkaApis for the main requests (FetchRequest, ProduceRequest, etc). We will need to add a new error code and exception to the protocol to indicate permission denied. Add a server configuration to give the class you want to instantiate that implements that interface. That class can define its own configuration properties from the main config file. Provide a simple implementation of this interface which just takes a user and ip whitelist and permits those in either of the whitelists to do anything, and denies all others. Rather than writing an integration test for this class we can probably just use this class for the TLS and SASL authentication testing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Gwen, Yes, looks like KAFKA-1927 will leave TopicMetadataRequest/Response. But I believe, KIP is still tightly related with KAFKA-1927 since we are not only going to update TopicMetadataRequest there but we will introduce a bunch of new requests too. And it probably makes sense to do those correctly from scratch - without introducing scala request objects. As I understand you'll have this common infrastructure code done in KAFKA-1927. Thanks, Andrii Biletskyi On Wed, Mar 18, 2015 at 8:38 PM, Gwen Shapira gshap...@cloudera.com wrote: On Wed, Mar 18, 2015 at 9:34 AM, Jun Rao j...@confluent.io wrote: Andri, Thanks for the summary. 1. I just realized that in order to start working on KAFKA-1927, we will need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk. This is planned to be done as part of KAFKA-1634. So, we will need Guozhang and Joel's help to wrap this up. I mentioned this in a separate thread, but it may be more relevant here: It looks like the SimpleConsumer API exposes TopicMetadataRequest and TopicMetadataResponse. This means that KAFKA-1927 doesn't remove this duplication. So I'm not sure we actually need KAFKA-1927 before implementing this KIP. This doesn't mean I'm stopping work on KAFKA-1927, but perhaps it means we can proceed in parallel? 2. Thinking about this a bit more, if the semantic of those write requests is async (i.e., after the client gets a response, it just means that the operation is initiated, but not necessarily completed), we don't really need to forward the requests to the controller. Instead, the receiving broker can just write the operation to ZK as the admin command line tool previously does. This will simplify the implementation. 8. There is another implementation detail for describe topic. Ideally, we want to read the topic config from the broker cache, instead of ZooKeeper. Currently, every broker reads the topic-level config for all topics. However, it ignores those for topics not hosted on itself. So, we may need to change TopicConfigManager a bit so that it caches the configs for all topics. Thanks, Jun On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, Thanks for a great discussion! Here are the actions points: 1. Q: Get rid of all scala requests objects, use java protocol definitions. A: Gwen kindly took that (KAFKA-1927). It's important to speed up review procedure there since this ticket blocks other important changes. 2. Q: Generic re-reroute facility vs client maintaining cluster state. A: Jay has added pseudo code to KAFKA-1912 - need to consider whether this will be easy to implement as a server-side feature (comments are welcomed!). 3. Q: Controller field in wire protocol. A: This might be useful for clients, add this to TopicMetadataResponse (already in KIP). 4. Q: Decoupling topic creation from TMR. A: I will add proposed by Jun solution (using clientId for that) to the KIP. 5. Q: Bumping new versions of TMR vs grabbing all protocol changes in one version. A: It was decided to try to gather all changes to protocol (before release). In case of TMR it worth checking: KAFKA-2020 and KIP-13 (quotas) 6. Q: JSON lib is needed to deserialize user's input in CLI tool. A: Use jackson for that, /tools project is a separate jar so shouldn't be a big deal. 7. Q: VerifyReassingPartitions vs generic status check command. A: For long-running requests like reassign partitions *progress* check request is useful, it makes sense to introduce it. Please add, correct me if I missed something. Thanks, Andrii Biletskyi On Tue, Mar 17, 2015 at 6:20 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Joel, You are right, I removed ClusterMetadata because we have partially what we need in TopicMetadata. Also, as Jay pointed out earlier, we would like to have orthogonal API, but at the same time we need to be backward compatible. But I like your idea and even have some other arguments for this option: There is also DescribeTopicRequest which was proposed in this KIP, it returns topic configs, partitions, replication factor plus partition ISR, ASR, leader replica. The later part is really already there in TopicMetadataRequest. So again we'll have to add stuff to TMR, not to duplicate some info in newly added requests. However, this way we'll end up with monster request which returns cluster metadata, topic replication and config info plus partition replication data. Seems logical to split TMR to - ClusterMetadata (brokers + controller, maybe smth else) - TopicMetadata (topic info + partition details) But since current TMR is involved in lots of places (including network client, as I understand) this might be very
Re: [Discussion] Using Client Requests and Responses in Server
Yes, that looks reasonable. Then, in KafkaApi, when we get into the handling of a specific request, we can construct the specific request object from struct. Thanks, Jun On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Jun, I was taking a slightly different approach. Let me know if it makes sense to you: 1. Get the bytes from network (kinda unavoidable...) 2. Modify RequestChannel.Request to contain header and body (instead of a single object) 3. Create the head and body from bytes as follow: val header: RequestHeader = RequestHeader.parse(buffer) val apiKey: Int = header.apiKey val body: Struct = ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] 4. KafkaAPIs will continue getting RequestChannel.Request, but will now have access to body and header separately. I agree that I need a Request/Response objects that contain only the body for all requests objects. I'm thinking of implementing them in o.a.k.Common.Requests in Java for consistency. When we are discussing the requests/responses used in SimpleConsumer, we mean everything used in javaapi, right? Gwen On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote: Hi, Gwen, I was thinking that we will be doing the following in KAFKA-1927. 1. Get the bytes from network. 2. Use a new generic approach to convert bytes into request objects. 2.1 Read the fixed request header (using the util in client). 2.2 Based on the request id in the header, deserialize the rest of the bytes into a request specific object (using the new java objects). 3. We will then be passing a header and an AbstractRequestResponse to KafkaApis. In order to do that, we will need to create similar request/response objects for internal requests such as StopReplica, LeaderAndIsr, UpdateMetadata, ControlledShutdown. Not sure whether they should be written in java or scala, but perhaps they should be only in the core project. Also note, there are some scala requests/responses used directly in SimpleConsumer. Since that's our public api, we can't remove those scala objects until the old consumer is phased out. We can remove the rest of the scala request objects. Thanks, Jun On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I'm starting this thread for the various questions I run into while refactoring the server to use client requests and responses. Help is appreciated :) First question: LEADER_AND_ISR request and STOP_REPLICA request are unimplemented in the client. Do we want to implement them as part of this refactoring? Or should we continue using the scala implementation for those? Gwen
Re: Review Request 31568: Patch for KAFKA-1989
On March 18, 2015, 4:14 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/server/DelayedOperation.scala, line 373 https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line373 Try purging on each watcher list in each doWork will likely increase the CPU a lot, we can reduce the CPU usage by only purging when watched() is too large. A list scan happens only when the unreachable count is greater than 100. The unreachable count is incremented by polling the reference queue. So, it is driven by GC cycles in some sense. If there is nothing to purge, the scan won't happen. And it is set to zero when tryCompleteWatched is called. So, I expect that the unreachable count is low most of the time. There is a problem in using watched(). It is the count of watched requests. On a busy server, watched() may exceed the threashold more often even when there is not many entries to purge. - Yasuhiro --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review76899 --- On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/ --- (Updated Feb. 28, 2015, 12:14 a.m.) Review request for kafka. Bugs: KAFKA-1989 https://issues.apache.org/jira/browse/KAFKA-1989 Repository: kafka Description --- new purgatory implementation Diffs - core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/31568/diff/ Testing --- Thanks, Yasuhiro Matsuda
Re: [DISCUSS] KIP-5 - Broker Configuration Management
Aditya, Yes, you are right. We agreed to think how global config can be done w/o restarting the broker, consumers/producers. I haven't yet updated the KIP accordingly though. In short, the problem is that KafkaConfig is spread out over different components, like SocketServer, OffsetManager etc. This makes it hard to dynamically change config, at least if we follow dynamic LogConfig approach. So my opinion and vision is that we need to treat each config block (like quotas in your case) separately. Logically, different cases will be handled differently, some of the configs are not eligible for dynamic change at all. Thanks, Andrii Biletskyi On Wed, Mar 18, 2015 at 6:54 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Andrii, Thanks for the writeup. IMO, we should be able to support dynamically changing certain configs. For example: the Quota proposal relies on such a mechanism for changing quotas on the fly. Aditya From: Andrii Biletskyi [andrii.bilets...@stealth.ly] Sent: Tuesday, March 03, 2015 10:30 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-5 - Broker Configuration Management Hey Jun, Initially I was thinking about instead of 3-4 state that global config changes take effect only after broker restart. So it's just: 3-4. On each broker startup apply global config from ZK In other words, the comprehensive workflow is the following: 1. Issue ChangeGlobalConfigRequest 2. Controller validates / stores config in ZK (out of scope of this KIP) 3. Do rolling restart for brokers one by one to pick up config changes My understanding is that we won't be able to handle config change dynamically as we do for Log config. The main reason, from my point of view, is that broker config operates such settings as num.io.threads updating which would mean gracefully restart some of the broker's components (in this case SocketServer) which is, in turn, might be very tricky. Thanks, Andrii Biletskyi On Tue, Mar 3, 2015 at 7:43 PM, Jun Rao j...@confluent.io wrote: It seems the proposed workflow is the following. 1. Client issues a global config update request to the broker. 2. Broker writes the new config to ZK. 3. Controller picks up the changes from ZK. 4. Controller propagates the config changes to all brokers. Do we need to add a new request/response to propagate the config changes? Also, this logic is a bit different from how per topic config changes works: each broker reads from ZK directly. It would be good to make them consistent. Thanks, Jun On Wed, Jan 21, 2015 at 10:34 PM, Joe Stein joe.st...@stealth.ly wrote: Created a KIP https://cwiki.apache.org/confluence/display/KAFKA/KIP-5+-+Broker+Configuration+Management JIRA https://issues.apache.org/jira/browse/KAFKA-1786 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop /
Re: [Discussion] Using Client Requests and Responses in Server
Looking at the SimpleConsumer, we need to leave in place: ConsumerMetadataResponse FetchRequest / FetchResponse OffsetFetchRequest / OffsetFetchResponse OffsetCommitRequest / OffsetCommitResponse OffsetRequest / OffsetResponse TopicMetadata TopicMetadataRequest / TopicMetadataResponse Specifically, TopicMetadata request and response will remain duplicated after KAFKA-1927. So I am no longer sure why this is a blocker for KIP-4. Gwen On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Jun, I was taking a slightly different approach. Let me know if it makes sense to you: 1. Get the bytes from network (kinda unavoidable...) 2. Modify RequestChannel.Request to contain header and body (instead of a single object) 3. Create the head and body from bytes as follow: val header: RequestHeader = RequestHeader.parse(buffer) val apiKey: Int = header.apiKey val body: Struct = ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] 4. KafkaAPIs will continue getting RequestChannel.Request, but will now have access to body and header separately. I agree that I need a Request/Response objects that contain only the body for all requests objects. I'm thinking of implementing them in o.a.k.Common.Requests in Java for consistency. When we are discussing the requests/responses used in SimpleConsumer, we mean everything used in javaapi, right? Gwen On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote: Hi, Gwen, I was thinking that we will be doing the following in KAFKA-1927. 1. Get the bytes from network. 2. Use a new generic approach to convert bytes into request objects. 2.1 Read the fixed request header (using the util in client). 2.2 Based on the request id in the header, deserialize the rest of the bytes into a request specific object (using the new java objects). 3. We will then be passing a header and an AbstractRequestResponse to KafkaApis. In order to do that, we will need to create similar request/response objects for internal requests such as StopReplica, LeaderAndIsr, UpdateMetadata, ControlledShutdown. Not sure whether they should be written in java or scala, but perhaps they should be only in the core project. Also note, there are some scala requests/responses used directly in SimpleConsumer. Since that's our public api, we can't remove those scala objects until the old consumer is phased out. We can remove the rest of the scala request objects. Thanks, Jun On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I'm starting this thread for the various questions I run into while refactoring the server to use client requests and responses. Help is appreciated :) First question: LEADER_AND_ISR request and STOP_REPLICA request are unimplemented in the client. Do we want to implement them as part of this refactoring? Or should we continue using the scala implementation for those? Gwen
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
(Thanks Andrii for the summary) For (1) yes we will circle back on that shortly after syncing up in person. I think it is close to getting committed although development for KAFKA-1927 can probably begin without it. There is one more item we covered at the hangout. i.e., whether we want to add the coordinator to the topic metadata response or provide a clearer ClusterMetadataRequest. There are two reasons I think we should try and avoid adding the field: - It is irrelevant to topic metadata - If we finally do request rerouting in Kafka then the field would add little to no value. (It still helps to have a separate ClusterMetadataRequest to query for cluster-wide information such as 'which broker is the controller?' as Joe mentioned.) I think it would be cleaner to have an explicit ClusterMetadataRequest that you can send to any broker in order to obtain the controller (and in the future possibly other cluster-wide information). I think the main argument against doing this and instead adding it to the topic metadata response was convenience - i.e., you don't have to discover the controller in advance. However, I don't see much actual benefit/convenience in this and in fact think it is a non-issue. Let me know if I'm overlooking something here. As an example, say we need to initiate partition reassignment by issuing the new ReassignPartitionsRequest to the controller (assume we already have the desired manual partition assignment). If we are to augment topic metadata response then the flow be something like this : - Issue topic metadata request to any broker (and discover the controller - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request to the controller. With an explicit cluster metadata request it would be: - Issue cluster metadata request to any broker - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request So it seems to add little practical value and bloats topic metadata response with an irrelevant detail. The other angle to this is the following - is it a matter of naming? Should we just rename topic metadata request/response to just MetadataRequest/Response and add cluster metadata to it? By that same token should we also allow querying for the consumer coordinator (and in future transaction coordinator) as well? This leads to a bloated request which isn't very appealing and altogether confusing. Thanks, Joel On Wed, Mar 18, 2015 at 09:34:12AM -0700, Jun Rao wrote: Andri, Thanks for the summary. 1. I just realized that in order to start working on KAFKA-1927, we will need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk. This is planned to be done as part of KAFKA-1634. So, we will need Guozhang and Joel's help to wrap this up. 2. Thinking about this a bit more, if the semantic of those write requests is async (i.e., after the client gets a response, it just means that the operation is initiated, but not necessarily completed), we don't really need to forward the requests to the controller. Instead, the receiving broker can just write the operation to ZK as the admin command line tool previously does. This will simplify the implementation. 8. There is another implementation detail for describe topic. Ideally, we want to read the topic config from the broker cache, instead of ZooKeeper. Currently, every broker reads the topic-level config for all topics. However, it ignores those for topics not hosted on itself. So, we may need to change TopicConfigManager a bit so that it caches the configs for all topics. Thanks, Jun On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, Thanks for a great discussion! Here are the actions points: 1. Q: Get rid of all scala requests objects, use java protocol definitions. A: Gwen kindly took that (KAFKA-1927). It's important to speed up review procedure there since this ticket blocks other important changes. 2. Q: Generic re-reroute facility vs client maintaining cluster state. A: Jay has added pseudo code to KAFKA-1912 - need to consider whether this will be easy to implement as a server-side feature (comments are welcomed!). 3. Q: Controller field in wire protocol. A: This might be useful for clients, add this to TopicMetadataResponse (already in KIP). 4. Q: Decoupling topic creation from TMR. A: I will add proposed by Jun solution (using clientId for that) to the KIP. 5. Q: Bumping new versions of TMR vs grabbing all protocol changes in one version. A: It was decided to try to gather all changes to protocol (before release). In case of TMR it worth checking: KAFKA-2020 and KIP-13 (quotas) 6. Q: JSON lib is needed to deserialize user's input in CLI tool. A: Use jackson for that, /tools project is a
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Got it. Thanks for clarifying! On Wed, Mar 18, 2015 at 11:54 AM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Gwen, Yes, looks like KAFKA-1927 will leave TopicMetadataRequest/Response. But I believe, KIP is still tightly related with KAFKA-1927 since we are not only going to update TopicMetadataRequest there but we will introduce a bunch of new requests too. And it probably makes sense to do those correctly from scratch - without introducing scala request objects. As I understand you'll have this common infrastructure code done in KAFKA-1927. Thanks, Andrii Biletskyi On Wed, Mar 18, 2015 at 8:38 PM, Gwen Shapira gshap...@cloudera.com wrote: On Wed, Mar 18, 2015 at 9:34 AM, Jun Rao j...@confluent.io wrote: Andri, Thanks for the summary. 1. I just realized that in order to start working on KAFKA-1927, we will need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk. This is planned to be done as part of KAFKA-1634. So, we will need Guozhang and Joel's help to wrap this up. I mentioned this in a separate thread, but it may be more relevant here: It looks like the SimpleConsumer API exposes TopicMetadataRequest and TopicMetadataResponse. This means that KAFKA-1927 doesn't remove this duplication. So I'm not sure we actually need KAFKA-1927 before implementing this KIP. This doesn't mean I'm stopping work on KAFKA-1927, but perhaps it means we can proceed in parallel? 2. Thinking about this a bit more, if the semantic of those write requests is async (i.e., after the client gets a response, it just means that the operation is initiated, but not necessarily completed), we don't really need to forward the requests to the controller. Instead, the receiving broker can just write the operation to ZK as the admin command line tool previously does. This will simplify the implementation. 8. There is another implementation detail for describe topic. Ideally, we want to read the topic config from the broker cache, instead of ZooKeeper. Currently, every broker reads the topic-level config for all topics. However, it ignores those for topics not hosted on itself. So, we may need to change TopicConfigManager a bit so that it caches the configs for all topics. Thanks, Jun On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, Thanks for a great discussion! Here are the actions points: 1. Q: Get rid of all scala requests objects, use java protocol definitions. A: Gwen kindly took that (KAFKA-1927). It's important to speed up review procedure there since this ticket blocks other important changes. 2. Q: Generic re-reroute facility vs client maintaining cluster state. A: Jay has added pseudo code to KAFKA-1912 - need to consider whether this will be easy to implement as a server-side feature (comments are welcomed!). 3. Q: Controller field in wire protocol. A: This might be useful for clients, add this to TopicMetadataResponse (already in KIP). 4. Q: Decoupling topic creation from TMR. A: I will add proposed by Jun solution (using clientId for that) to the KIP. 5. Q: Bumping new versions of TMR vs grabbing all protocol changes in one version. A: It was decided to try to gather all changes to protocol (before release). In case of TMR it worth checking: KAFKA-2020 and KIP-13 (quotas) 6. Q: JSON lib is needed to deserialize user's input in CLI tool. A: Use jackson for that, /tools project is a separate jar so shouldn't be a big deal. 7. Q: VerifyReassingPartitions vs generic status check command. A: For long-running requests like reassign partitions *progress* check request is useful, it makes sense to introduce it. Please add, correct me if I missed something. Thanks, Andrii Biletskyi On Tue, Mar 17, 2015 at 6:20 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Joel, You are right, I removed ClusterMetadata because we have partially what we need in TopicMetadata. Also, as Jay pointed out earlier, we would like to have orthogonal API, but at the same time we need to be backward compatible. But I like your idea and even have some other arguments for this option: There is also DescribeTopicRequest which was proposed in this KIP, it returns topic configs, partitions, replication factor plus partition ISR, ASR, leader replica. The later part is really already there in TopicMetadataRequest. So again we'll have to add stuff to TMR, not to duplicate some info in newly added requests. However, this way we'll end up with monster request which returns cluster metadata, topic replication and config info plus partition replication data. Seems logical to split TMR to - ClusterMetadata (brokers + controller, maybe smth else) - TopicMetadata (topic info + partition
Re: [Discussion] Using Client Requests and Responses in Server
OK, Andrii clarified that we need KAFKA-1927 before KIP-4 for the infrastructure for using the common request/response classes in core. Jun, when you got a moment, please confirm if the approach I'm taking is acceptable, or if you see issues that I'm missing. On Wed, Mar 18, 2015 at 11:33 AM, Gwen Shapira gshap...@cloudera.com wrote: Looking at the SimpleConsumer, we need to leave in place: ConsumerMetadataResponse FetchRequest / FetchResponse OffsetFetchRequest / OffsetFetchResponse OffsetCommitRequest / OffsetCommitResponse OffsetRequest / OffsetResponse TopicMetadata TopicMetadataRequest / TopicMetadataResponse Specifically, TopicMetadata request and response will remain duplicated after KAFKA-1927. So I am no longer sure why this is a blocker for KIP-4. Gwen On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Jun, I was taking a slightly different approach. Let me know if it makes sense to you: 1. Get the bytes from network (kinda unavoidable...) 2. Modify RequestChannel.Request to contain header and body (instead of a single object) 3. Create the head and body from bytes as follow: val header: RequestHeader = RequestHeader.parse(buffer) val apiKey: Int = header.apiKey val body: Struct = ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] 4. KafkaAPIs will continue getting RequestChannel.Request, but will now have access to body and header separately. I agree that I need a Request/Response objects that contain only the body for all requests objects. I'm thinking of implementing them in o.a.k.Common.Requests in Java for consistency. When we are discussing the requests/responses used in SimpleConsumer, we mean everything used in javaapi, right? Gwen On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote: Hi, Gwen, I was thinking that we will be doing the following in KAFKA-1927. 1. Get the bytes from network. 2. Use a new generic approach to convert bytes into request objects. 2.1 Read the fixed request header (using the util in client). 2.2 Based on the request id in the header, deserialize the rest of the bytes into a request specific object (using the new java objects). 3. We will then be passing a header and an AbstractRequestResponse to KafkaApis. In order to do that, we will need to create similar request/response objects for internal requests such as StopReplica, LeaderAndIsr, UpdateMetadata, ControlledShutdown. Not sure whether they should be written in java or scala, but perhaps they should be only in the core project. Also note, there are some scala requests/responses used directly in SimpleConsumer. Since that's our public api, we can't remove those scala objects until the old consumer is phased out. We can remove the rest of the scala request objects. Thanks, Jun On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I'm starting this thread for the various questions I run into while refactoring the server to use client requests and responses. Help is appreciated :) First question: LEADER_AND_ISR request and STOP_REPLICA request are unimplemented in the client. Do we want to implement them as part of this refactoring? Or should we continue using the scala implementation for those? Gwen
Re: [Discussion] Using Client Requests and Responses in Server
See inline responses: On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Gwen, This makes sense to me. A couple of thoughts, mostly confirming what you said I think: 1. Ideally we would move completely over to the new style of request definition for server-side processing, even for the internal requests. This way all requests would have the same header/body struct stuff. As you say for the internal requests we can just delete the scala code. For the old clients they will continue to use their old request definitions until we eol them. I would propose that new changes will go only into the new request/response objects and the old scala ones will be permanently stuck on their current version until discontinued. So after this change that old scala code could be considered frozen. SimpleConsumer is obviously stuck with the old request/response. The Producers can be converted to the common request/response without breaking compatibility. I think we should do this (even though it requires fiddling with additional network serialization code), just so we can throw the old ProduceRequest away. Does that make sense? 2. I think it would be reasonable to keep all the requests under common, even though as you point out there is currently no use for some of them beyond broker-to-broker communication at the moment. Yep. 3. We should think a little about how versioning will work. Making this convenient on the server side is an important goal for the new style of request definition. At the serialization level we now handle versioning but the question we should discuss and work out is how this will map to the request objects (which I assume will remain unversioned). The way I see it working (I just started on this, so I may have gaps): * Request header contains the version * When we read the request, we use ProtoUtils.requestSchema which takes version as a parameter and is responsible to give us the right Schema, which we use to read the buffer and get the correct struct. * KafkaApis handlers have the header, so they can use it to access the correct fields, build the correct response, etc. Does that sound about right? 4. Ideally after this refactoring the network package should not be dependent on the individual request objects. The intention is that stuff in kafka.network is meant to be generic network infrastructure that doesn't know about the particular fetch/produce apis we have implemented on top. I'll make a note to validate that this is the case. -Jay On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Jun, I was taking a slightly different approach. Let me know if it makes sense to you: 1. Get the bytes from network (kinda unavoidable...) 2. Modify RequestChannel.Request to contain header and body (instead of a single object) 3. Create the head and body from bytes as follow: val header: RequestHeader = RequestHeader.parse(buffer) val apiKey: Int = header.apiKey val body: Struct = ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] 4. KafkaAPIs will continue getting RequestChannel.Request, but will now have access to body and header separately. I agree that I need a Request/Response objects that contain only the body for all requests objects. I'm thinking of implementing them in o.a.k.Common.Requests in Java for consistency. When we are discussing the requests/responses used in SimpleConsumer, we mean everything used in javaapi, right? Gwen On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote: Hi, Gwen, I was thinking that we will be doing the following in KAFKA-1927. 1. Get the bytes from network. 2. Use a new generic approach to convert bytes into request objects. 2.1 Read the fixed request header (using the util in client). 2.2 Based on the request id in the header, deserialize the rest of the bytes into a request specific object (using the new java objects). 3. We will then be passing a header and an AbstractRequestResponse to KafkaApis. In order to do that, we will need to create similar request/response objects for internal requests such as StopReplica, LeaderAndIsr, UpdateMetadata, ControlledShutdown. Not sure whether they should be written in java or scala, but perhaps they should be only in the core project. Also note, there are some scala requests/responses used directly in SimpleConsumer. Since that's our public api, we can't remove those scala objects until the old consumer is phased out. We can remove the rest of the scala request objects. Thanks, Jun On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I'm starting this thread for the various questions I run into while refactoring the server to use client requests and responses. Help is appreciated :) First question:
Re: [DISCUSSION] KIP-15 close(timeout) for producer
Personally I'm in favor of (1) just to reduce the number of different APIs. People will find the difference between abort and close subtle and confusing and the only instance where you want it is this somewhat unusual case you guys are pursuing, right? -Jay On Wed, Mar 18, 2015 at 2:13 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: It looks we have another option and are now deciding between the following two interfaces: 1. Close() + close(timeout) - timeout could be either positive or zero. - only close(0) can be called from sender thread 2. Close() + abort() + close(timeout) - timeout can either be positive or zero - only abort() can be called from sender thread - abort() is equivalent to close(0) in 1) but does not join sender thread and does not close metrics. - Another thread has to call close() or close(timeout) in order to make sure the resources in producer are gone. The tow approach provides the same function we need, the difference is approach 2) follows convention of close() and abort(). On the other hand, approach 1) saves one interface compared with approach 2) but does not follow the convention. When the two approaches come to user code, it is probably something like this: Try { While(!finished) Producer.send(record, callback) } catch (Exception e) { Producer.close(5) } Class CallbackImpl implements Callback { onCompletion(RecordMetadata metadata Exception e) { If (e != null) Abort() / close() } } Because the two approach leads to almost the same user code, assuming users are always calling producer.close() as a clean up step, personally I prefer approach 2) as it follows convention. Any thoughts? Jiangjie (Becket) Qin On 3/17/15, 10:25 AM, Jiangjie Qin j...@linkedin.com wrote: Hi Jun, Yes, as Guozhang said, the main reason we set a flag is because close(0) is expected to be called by sender thread itself. If we want to maintain the semantic meaning of close(), one alternative is to have an abort() method does the same thing as close(0) except cleanup. And in close(timeout), after timeout we call abort() and join the sender thread. This was one of the previous proposal. We merged abort to close(0) because they are almost doing the same thing. But from what you mentioned, it might make sense to have two separate methods. Thanks. Jiangjie (Becket) Qin On 3/16/15, 10:31 PM, Guozhang Wang wangg...@gmail.com wrote: Yeah in this sense the sender thread will not exist immediately in the close(0) call, but will only terminate after the current response batch has been processed, as will the producer instance itself. There is a reason for this though: for a clean shutdown the caller thread has to wait for the sender thread to join before closing the producer instance, but this cannot be achieve if close(0) is called by the sender thread itself (for example in KAFKA-1659, there is a proposal from Andrew Stein on using thread.interrupt and thread.stop, but if it is called by the ioThread itself the stop call will fail). Hence we came up with the flag approach to let the sender thread to close as soon as it is at the barrier of the run loop. Guozhang On Mon, Mar 16, 2015 at 9:41 PM, Jun Rao j...@confluent.io wrote: Hmm, does that mean that after close(0), the sender thread is not necessary gone? Normally, after closing an entity, we expect all internal threads associated with the entity are shut down completely. Thanks, Jun On Mon, Mar 16, 2015 at 3:18 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jun, Close(0) will set two flags in sender. Running=false and a newly added forceClose=true. It will also set accumulator.closed=true so no further producer.send() will succeed. The sender thread will finish executing all the callbacks in current batch of responses, then it will see the forceClose flag. It will just fail all the incomplete batches in the producer and exit. So close(0) is a non-blocking call and sender thread will not try to join itself in close(0). Thanks. Jiangjie (Becket) Qin On 3/16/15, 2:50 PM, Jun Rao j...@confluent.io wrote: How does close(0) work if it's called from the sender thread? If close(0) needs to wait for the sender thread to join, wouldn't this cause a deadlock? Thanks, Jun On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Thanks Guozhang. It wouldn’t be as thoroughly considered without discussing with you :) Jiangjie (Becket) Qin On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks Jiangjie, After talking to you offline on this, I have been convinced and changed my preference to blocking. The immediate shutdown approach does have some unsafeness in some cases. Guozhang On Mon, Mar 16, 2015 at 11:50 AM,
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Joel, Andril, I think we agreed that those admin requests can be issued to any broker. Because of that, there doesn't seem to be a strong need to know the controller. So, perhaps we can proceed by not making any change to the format of TMR right now. When we start using create topic request in the producer, we will need a new version of TMR that doesn't trigger auto topic creation. But that can be done later. As a first cut implementation, I think the broker can just write to ZK directly for createToipic/alterTopic/reassignPartitions/preferredLeaderElection requests, instead of forwarding them to the controller. This will simplify the implementation on the broker side. Thanks, Jun On Wed, Mar 18, 2015 at 11:58 AM, Joel Koshy jjkosh...@gmail.com wrote: (Thanks Andrii for the summary) For (1) yes we will circle back on that shortly after syncing up in person. I think it is close to getting committed although development for KAFKA-1927 can probably begin without it. There is one more item we covered at the hangout. i.e., whether we want to add the coordinator to the topic metadata response or provide a clearer ClusterMetadataRequest. There are two reasons I think we should try and avoid adding the field: - It is irrelevant to topic metadata - If we finally do request rerouting in Kafka then the field would add little to no value. (It still helps to have a separate ClusterMetadataRequest to query for cluster-wide information such as 'which broker is the controller?' as Joe mentioned.) I think it would be cleaner to have an explicit ClusterMetadataRequest that you can send to any broker in order to obtain the controller (and in the future possibly other cluster-wide information). I think the main argument against doing this and instead adding it to the topic metadata response was convenience - i.e., you don't have to discover the controller in advance. However, I don't see much actual benefit/convenience in this and in fact think it is a non-issue. Let me know if I'm overlooking something here. As an example, say we need to initiate partition reassignment by issuing the new ReassignPartitionsRequest to the controller (assume we already have the desired manual partition assignment). If we are to augment topic metadata response then the flow be something like this : - Issue topic metadata request to any broker (and discover the controller - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request to the controller. With an explicit cluster metadata request it would be: - Issue cluster metadata request to any broker - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request So it seems to add little practical value and bloats topic metadata response with an irrelevant detail. The other angle to this is the following - is it a matter of naming? Should we just rename topic metadata request/response to just MetadataRequest/Response and add cluster metadata to it? By that same token should we also allow querying for the consumer coordinator (and in future transaction coordinator) as well? This leads to a bloated request which isn't very appealing and altogether confusing. Thanks, Joel On Wed, Mar 18, 2015 at 09:34:12AM -0700, Jun Rao wrote: Andri, Thanks for the summary. 1. I just realized that in order to start working on KAFKA-1927, we will need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk. This is planned to be done as part of KAFKA-1634. So, we will need Guozhang and Joel's help to wrap this up. 2. Thinking about this a bit more, if the semantic of those write requests is async (i.e., after the client gets a response, it just means that the operation is initiated, but not necessarily completed), we don't really need to forward the requests to the controller. Instead, the receiving broker can just write the operation to ZK as the admin command line tool previously does. This will simplify the implementation. 8. There is another implementation detail for describe topic. Ideally, we want to read the topic config from the broker cache, instead of ZooKeeper. Currently, every broker reads the topic-level config for all topics. However, it ignores those for topics not hosted on itself. So, we may need to change TopicConfigManager a bit so that it caches the configs for all topics. Thanks, Jun On Tue, Mar 17, 2015 at 1:13 PM, Andrii Biletskyi andrii.bilets...@stealth.ly wrote: Guys, Thanks for a great discussion! Here are the actions points: 1. Q: Get rid of all scala requests objects, use java protocol definitions. A: Gwen kindly took that (KAFKA-1927). It's important to speed up review procedure there since this ticket blocks other important changes. 2. Q: Generic re-reroute facility vs
Re: Review Request 31568: Patch for KAFKA-1989
On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/server/DelayedOperation.scala, lines 187-192 https://reviews.apache.org/r/31568/diff/1/?file=881353#file881353line187 TBD Guozhang Wang wrote: Replace the TBD here: we can let Timer.addTimerTaskEntry and Timer.add return Boolean instead of Unit indicating if the task has not expired and successfully added to the timer. And then we can change above as if (!operation.isComplete()) { if (!timeoutTimer.add(operation) { operation.cancel() } } Yasuhiro Matsuda wrote: An expired task is always successfuly added to the timer and executed immediately. Do you mean completed instead of expired? TimeTask do not have notion of completion. I kept the completion concept out of the Timer implementation since it is not essential to Timer functionality. That makes sense. I guess I was just trying to avoid calling operation.isComplete consecutively, which may actually not be a bad thing. On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/utils/timer/Timer.scala, line 68 https://reviews.apache.org/r/31568/diff/1/?file=881356#file881356line68 I think bucket.flush(reinsurt) will always fail on all the items since their expiration time will always bucket expiration + ticketMs, i.e. the returned bucket from the delayed queue has already expired all its items. In this case, could we just call foreach(submit) on all of them instead of trying to reinsurt them? Yasuhiro Matsuda wrote: It is true only for the lowest wheel. Reinsert is necessary to make timing wheels work. A bucket from a higher wheel may contain tasks not expired (a tick time is longer in a higher wheel). Guozhang Wang wrote: OK, I may miss sth. here, but this is my reasoning: The bucket is only returned from delayed queue in line 62 if its expiration time has passed currentTime, after that at least the lowest wheel will advance to its expiration time, and hence add call within the reinsert is doomed to fail as task.expirationTime wheel's time + tickMs. Yasuhiro Matsuda wrote: If the expired bucket is from the lowest wheel, all tasks in the bucket is expired. reinsert submits the task to a thread pool for execution. If the expired bucket is from a higher wheel, tasks are either expired or not expired. reinsert submits the expired tasks to a thread pool and move unexpired tasks to lower wheels. Got it. On March 17, 2015, 8:56 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72 https://reviews.apache.org/r/31568/diff/1/?file=881358#file881358line72 It seems the task entry of the task will only be set once throughout its life time; even when the task entry gets reinsurted its correspondence to the task will not change, right? If that is true we can just set the entry for the task in the constructor of the task entry. Yasuhiro Matsuda wrote: This sets TimerTaskEntry to TimerTask. TimeTask is created independently from a Timer, then enqueued to a Timer. Guozhang Wang wrote: Yes, but can we move this line to line 119 of TimerTaskList.scala? Then in line 46 of Timer when we create the TimerTaskEntry with the passed in TimerTask its entry field will be set automatically. Yasuhiro Matsuda wrote: If a task already enqueued to a timer is enqueued again intentionally or unintentionally (=bug), what happens? My intention here is to keep data structure consistent in such a case. setTimerTaskEntry removes the old entry if exists. This is true, I was originally confused about whether we ever need to re-enqueue, but the previous comment made it clear to me now. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review76459 --- On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/ --- (Updated Feb. 28, 2015, 12:14 a.m.) Review request for kafka. Bugs: KAFKA-1989 https://issues.apache.org/jira/browse/KAFKA-1989 Repository: kafka Description --- new purgatory implementation Diffs - core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 core/src/main/scala/kafka/server/ReplicaManager.scala 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION
Re: Review Request 32193: Patch for KAFKA-1997
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32193/#review76955 --- Ship it! Ship It! - Guozhang Wang On March 18, 2015, 7:47 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32193/ --- (Updated March 18, 2015, 7:47 p.m.) Review request for kafka. Bugs: KAFKA-1997 https://issues.apache.org/jira/browse/KAFKA-1997 Repository: kafka Description --- Replaced producer config properties string with macros Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala 11acc3103e4e4a30d7380e26355ccba09b3304bb Diff: https://reviews.apache.org/r/32193/diff/ Testing --- Thanks, Jiangjie Qin
Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations
Jun, I might be wrong but didn't we agree we will let any broker from the cluster handle *long-running* admin requests (at this time preferredReplica and reassignPartitions), via zk admin path. Thus CreateTopics etc should be sent only to the controller. Thanks, Andrii Biletskyi On Wed, Mar 18, 2015 at 11:55 PM, Jun Rao j...@confluent.io wrote: Joel, Andril, I think we agreed that those admin requests can be issued to any broker. Because of that, there doesn't seem to be a strong need to know the controller. So, perhaps we can proceed by not making any change to the format of TMR right now. When we start using create topic request in the producer, we will need a new version of TMR that doesn't trigger auto topic creation. But that can be done later. As a first cut implementation, I think the broker can just write to ZK directly for createToipic/alterTopic/reassignPartitions/preferredLeaderElection requests, instead of forwarding them to the controller. This will simplify the implementation on the broker side. Thanks, Jun On Wed, Mar 18, 2015 at 11:58 AM, Joel Koshy jjkosh...@gmail.com wrote: (Thanks Andrii for the summary) For (1) yes we will circle back on that shortly after syncing up in person. I think it is close to getting committed although development for KAFKA-1927 can probably begin without it. There is one more item we covered at the hangout. i.e., whether we want to add the coordinator to the topic metadata response or provide a clearer ClusterMetadataRequest. There are two reasons I think we should try and avoid adding the field: - It is irrelevant to topic metadata - If we finally do request rerouting in Kafka then the field would add little to no value. (It still helps to have a separate ClusterMetadataRequest to query for cluster-wide information such as 'which broker is the controller?' as Joe mentioned.) I think it would be cleaner to have an explicit ClusterMetadataRequest that you can send to any broker in order to obtain the controller (and in the future possibly other cluster-wide information). I think the main argument against doing this and instead adding it to the topic metadata response was convenience - i.e., you don't have to discover the controller in advance. However, I don't see much actual benefit/convenience in this and in fact think it is a non-issue. Let me know if I'm overlooking something here. As an example, say we need to initiate partition reassignment by issuing the new ReassignPartitionsRequest to the controller (assume we already have the desired manual partition assignment). If we are to augment topic metadata response then the flow be something like this : - Issue topic metadata request to any broker (and discover the controller - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request to the controller. With an explicit cluster metadata request it would be: - Issue cluster metadata request to any broker - Connect to controller if required (i.e., if the broker above != controller) - Issue the partition reassignment request So it seems to add little practical value and bloats topic metadata response with an irrelevant detail. The other angle to this is the following - is it a matter of naming? Should we just rename topic metadata request/response to just MetadataRequest/Response and add cluster metadata to it? By that same token should we also allow querying for the consumer coordinator (and in future transaction coordinator) as well? This leads to a bloated request which isn't very appealing and altogether confusing. Thanks, Joel On Wed, Mar 18, 2015 at 09:34:12AM -0700, Jun Rao wrote: Andri, Thanks for the summary. 1. I just realized that in order to start working on KAFKA-1927, we will need to merge the changes to OffsetCommitRequest (from 0.8.2) to trunk. This is planned to be done as part of KAFKA-1634. So, we will need Guozhang and Joel's help to wrap this up. 2. Thinking about this a bit more, if the semantic of those write requests is async (i.e., after the client gets a response, it just means that the operation is initiated, but not necessarily completed), we don't really need to forward the requests to the controller. Instead, the receiving broker can just write the operation to ZK as the admin command line tool previously does. This will simplify the implementation. 8. There is another implementation detail for describe topic. Ideally, we want to read the topic config from the broker cache, instead of ZooKeeper. Currently, every broker reads the topic-level config for all topics. However, it ignores those for topics not hosted on itself. So, we may need to change TopicConfigManager a bit so that it caches the configs for all topics. Thanks, Jun
Re: [Discussion] Using Client Requests and Responses in Server
3. The way that things are working right now is that there is a single request object for all versions. The layout of the request object always corresponds to the latest version. Under normal version evolution, the request object should be able to be constructed the binary of any version. For example, if an old version is missing a field, the request object will just be using a default value. If an old version has an extra field, it's just ignored. Thanks, Jun On Wed, Mar 18, 2015 at 2:50 PM, Gwen Shapira gshap...@cloudera.com wrote: See inline responses: On Wed, Mar 18, 2015 at 2:26 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Gwen, This makes sense to me. A couple of thoughts, mostly confirming what you said I think: 1. Ideally we would move completely over to the new style of request definition for server-side processing, even for the internal requests. This way all requests would have the same header/body struct stuff. As you say for the internal requests we can just delete the scala code. For the old clients they will continue to use their old request definitions until we eol them. I would propose that new changes will go only into the new request/response objects and the old scala ones will be permanently stuck on their current version until discontinued. So after this change that old scala code could be considered frozen. SimpleConsumer is obviously stuck with the old request/response. The Producers can be converted to the common request/response without breaking compatibility. I think we should do this (even though it requires fiddling with additional network serialization code), just so we can throw the old ProduceRequest away. Does that make sense? 2. I think it would be reasonable to keep all the requests under common, even though as you point out there is currently no use for some of them beyond broker-to-broker communication at the moment. Yep. 3. We should think a little about how versioning will work. Making this convenient on the server side is an important goal for the new style of request definition. At the serialization level we now handle versioning but the question we should discuss and work out is how this will map to the request objects (which I assume will remain unversioned). The way I see it working (I just started on this, so I may have gaps): * Request header contains the version * When we read the request, we use ProtoUtils.requestSchema which takes version as a parameter and is responsible to give us the right Schema, which we use to read the buffer and get the correct struct. * KafkaApis handlers have the header, so they can use it to access the correct fields, build the correct response, etc. Does that sound about right? 4. Ideally after this refactoring the network package should not be dependent on the individual request objects. The intention is that stuff in kafka.network is meant to be generic network infrastructure that doesn't know about the particular fetch/produce apis we have implemented on top. I'll make a note to validate that this is the case. -Jay On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Jun, I was taking a slightly different approach. Let me know if it makes sense to you: 1. Get the bytes from network (kinda unavoidable...) 2. Modify RequestChannel.Request to contain header and body (instead of a single object) 3. Create the head and body from bytes as follow: val header: RequestHeader = RequestHeader.parse(buffer) val apiKey: Int = header.apiKey val body: Struct = ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] 4. KafkaAPIs will continue getting RequestChannel.Request, but will now have access to body and header separately. I agree that I need a Request/Response objects that contain only the body for all requests objects. I'm thinking of implementing them in o.a.k.Common.Requests in Java for consistency. When we are discussing the requests/responses used in SimpleConsumer, we mean everything used in javaapi, right? Gwen On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote: Hi, Gwen, I was thinking that we will be doing the following in KAFKA-1927. 1. Get the bytes from network. 2. Use a new generic approach to convert bytes into request objects. 2.1 Read the fixed request header (using the util in client). 2.2 Based on the request id in the header, deserialize the rest of the bytes into a request specific object (using the new java objects). 3. We will then be passing a header and an AbstractRequestResponse to KafkaApis. In order to do that, we will need to create similar request/response objects for internal requests such as StopReplica, LeaderAndIsr, UpdateMetadata, ControlledShutdown. Not
Build failed in Jenkins: Kafka-trunk #427
See https://builds.apache.org/job/Kafka-trunk/427/changes Changes: [wangguoz] KAFKA-1997; Hopefully last follow-up fix to get messageHandlerArgs right -- [...truncated 1921 lines...] at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:40) kafka.integration.PrimitiveApiTest testPipelinedProduceRequests FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:40) at kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:44) at kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:40) at kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33) at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:40) kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooHigh FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.integration.AutoOffsetResetTest.kafka$integration$KafkaServerTestHarness$$super$setUp(AutoOffsetResetTest.scala:32) at kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:44) at kafka.integration.AutoOffsetResetTest.setUp(AutoOffsetResetTest.scala:46) kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooLow FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.integration.AutoOffsetResetTest.kafka$integration$KafkaServerTestHarness$$super$setUp(AutoOffsetResetTest.scala:32) at kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:44) at kafka.integration.AutoOffsetResetTest.setUp(AutoOffsetResetTest.scala:46) kafka.integration.AutoOffsetResetTest testResetToLatestWhenOffsetTooHigh FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.integration.AutoOffsetResetTest.kafka$integration$KafkaServerTestHarness$$super$setUp(AutoOffsetResetTest.scala:32) at kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:44) at kafka.integration.AutoOffsetResetTest.setUp(AutoOffsetResetTest.scala:46) kafka.integration.AutoOffsetResetTest testResetToLatestWhenOffsetTooLow FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at
Re: Review Request 32193: Patch for KAFKA-1997
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32193/ --- (Updated March 18, 2015, 7:47 p.m.) Review request for kafka. Bugs: KAFKA-1997 https://issues.apache.org/jira/browse/KAFKA-1997 Repository: kafka Description (updated) --- Replaced producer config properties string with macros Diffs (updated) - core/src/main/scala/kafka/tools/MirrorMaker.scala 11acc3103e4e4a30d7380e26355ccba09b3304bb Diff: https://reviews.apache.org/r/32193/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-1997) Refactor Mirror Maker
[ https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiangjie Qin updated KAFKA-1997: Attachment: KAFKA-1997_2015-03-18_12:47:32.patch Refactor Mirror Maker - Key: KAFKA-1997 URL: https://issues.apache.org/jira/browse/KAFKA-1997 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1997.patch, KAFKA-1997.patch, KAFKA-1997_2015-03-03_16:28:46.patch, KAFKA-1997_2015-03-04_15:07:46.patch, KAFKA-1997_2015-03-04_15:42:45.patch, KAFKA-1997_2015-03-05_20:14:58.patch, KAFKA-1997_2015-03-09_18:55:54.patch, KAFKA-1997_2015-03-10_18:31:34.patch, KAFKA-1997_2015-03-11_15:20:18.patch, KAFKA-1997_2015-03-11_19:10:53.patch, KAFKA-1997_2015-03-13_14:43:34.patch, KAFKA-1997_2015-03-17_13:47:01.patch, KAFKA-1997_2015-03-18_12:47:32.patch Refactor mirror maker based on KIP-3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1997) Refactor Mirror Maker
[ https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367761#comment-14367761 ] Jiangjie Qin commented on KAFKA-1997: - Updated reviewboard https://reviews.apache.org/r/32193/diff/ against branch origin/trunk Refactor Mirror Maker - Key: KAFKA-1997 URL: https://issues.apache.org/jira/browse/KAFKA-1997 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1997.patch, KAFKA-1997.patch, KAFKA-1997_2015-03-03_16:28:46.patch, KAFKA-1997_2015-03-04_15:07:46.patch, KAFKA-1997_2015-03-04_15:42:45.patch, KAFKA-1997_2015-03-05_20:14:58.patch, KAFKA-1997_2015-03-09_18:55:54.patch, KAFKA-1997_2015-03-10_18:31:34.patch, KAFKA-1997_2015-03-11_15:20:18.patch, KAFKA-1997_2015-03-11_19:10:53.patch, KAFKA-1997_2015-03-13_14:43:34.patch, KAFKA-1997_2015-03-17_13:47:01.patch, KAFKA-1997_2015-03-18_12:47:32.patch Refactor mirror maker based on KIP-3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Steven, The current proposal is actually to enforce quotas at the client/application level, NOT the topic level. So if you have a service with a few dozen instances the quota is against all of those instances added up across all their topics. So actually the effect would be the same either way but throttling gives the producer the choice of either blocking or dropping. -Jay On Tue, Mar 17, 2015 at 10:08 AM, Steven Wu stevenz...@gmail.com wrote: Jay, let's say an app produces to 10 different topics. one of the topic is sent from a library. due to whatever condition/bug, this lib starts to send messages over the quota. if we go with the delayed response approach, it will cause the whole shared RecordAccumulator buffer to be filled up. that will penalize other 9 topics who are within the quota. that is the unfairness point that Ewen and I were trying to make. if broker just drop the msg and return an error/status code indicates the drop and why. then producer can just move on and accept the drop. shared buffer won't be saturated and other 9 topics won't be penalized. Thanks, Steven On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Steven, It is true that hitting the quota will cause back-pressure on the producer. But the solution is simple, a producer that wants to avoid this should stay under its quota. In other words this is a contract between the cluster and the client, with each side having something to uphold. Quite possibly the same thing will happen in the absence of a quota, a client that produces an unexpected amount of load will hit the limits of the server and experience backpressure. Quotas just allow you to set that same limit at something lower than 100% of all resources on the server, which is useful for a shared cluster. -Jay On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu stevenz...@gmail.com wrote: wait. we create one kafka producer for each cluster. each cluster can have many topics. if producer buffer got filled up due to delayed response for one throttled topic, won't that penalize other topics unfairly? it seems to me that broker should just return error without delay. sorry that I am chatting to myself :) On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu stevenz...@gmail.com wrote: I think I can answer my own question. delayed response will cause the producer buffer to be full, which then result in either thread blocking or message drop. On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu stevenz...@gmail.com wrote: please correct me if I am missing sth here. I am not understanding how would throttle work without cooperation/back-off from producer. new Java producer supports non-blocking API. why would delayed response be able to slow down producer? producer will continue to fire async sends. On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang wangg...@gmail.com wrote: I think we are really discussing two separate issues here: 1. Whether we should a) append-then-block-then-returnOKButThrottled or b) block-then-returnFailDuetoThrottled for quota actions on produce requests. Both these approaches assume some kind of well-behaveness of the clients: option a) assumes the client sets an proper timeout value while can just ignore OKButThrottled response, while option b) assumes the client handles the FailDuetoThrottled appropriately. For any malicious clients that, for example, just keep retrying either intentionally or not, neither of these approaches are actually effective. 2. For OKButThrottled and FailDuetoThrottled responses, shall we encode them as error codes or augment the protocol to use a separate field indicating status codes. Today we have already incorporated some status code as error codes in the responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this is of course using a single field for response status like the HTTP status codes, while the cons is that it requires clients to handle the error codes carefully. I think maybe we can actually extend the single-code approach to overcome its drawbacks, that is, wrap the error codes semantics to the users so that users do not need to handle the codes one-by-one. More concretely, following Jay's example the client could write sth. like this: - if(error.isOK()) // status code is good or the code can be simply ignored for this request type, process the request else if(error.needsRetry()) // throttled, transient error, etc: retry else if(error.isFatal()) // non-retriable errors, etc: notify / terminate / other handling - Only when the clients really want to handle, for example
Re: [Discussion] Using Client Requests and Responses in Server
Hey Gwen, This makes sense to me. A couple of thoughts, mostly confirming what you said I think: 1. Ideally we would move completely over to the new style of request definition for server-side processing, even for the internal requests. This way all requests would have the same header/body struct stuff. As you say for the internal requests we can just delete the scala code. For the old clients they will continue to use their old request definitions until we eol them. I would propose that new changes will go only into the new request/response objects and the old scala ones will be permanently stuck on their current version until discontinued. So after this change that old scala code could be considered frozen. 2. I think it would be reasonable to keep all the requests under common, even though as you point out there is currently no use for some of them beyond broker-to-broker communication at the moment. 3. We should think a little about how versioning will work. Making this convenient on the server side is an important goal for the new style of request definition. At the serialization level we now handle versioning but the question we should discuss and work out is how this will map to the request objects (which I assume will remain unversioned). 4. Ideally after this refactoring the network package should not be dependent on the individual request objects. The intention is that stuff in kafka.network is meant to be generic network infrastructure that doesn't know about the particular fetch/produce apis we have implemented on top. -Jay On Wed, Mar 18, 2015 at 11:11 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi Jun, I was taking a slightly different approach. Let me know if it makes sense to you: 1. Get the bytes from network (kinda unavoidable...) 2. Modify RequestChannel.Request to contain header and body (instead of a single object) 3. Create the head and body from bytes as follow: val header: RequestHeader = RequestHeader.parse(buffer) val apiKey: Int = header.apiKey val body: Struct = ProtoUtils.currentRequestSchema(apiKey).read(buffer).asInstanceOf[Struct] 4. KafkaAPIs will continue getting RequestChannel.Request, but will now have access to body and header separately. I agree that I need a Request/Response objects that contain only the body for all requests objects. I'm thinking of implementing them in o.a.k.Common.Requests in Java for consistency. When we are discussing the requests/responses used in SimpleConsumer, we mean everything used in javaapi, right? Gwen On Wed, Mar 18, 2015 at 9:55 AM, Jun Rao j...@confluent.io wrote: Hi, Gwen, I was thinking that we will be doing the following in KAFKA-1927. 1. Get the bytes from network. 2. Use a new generic approach to convert bytes into request objects. 2.1 Read the fixed request header (using the util in client). 2.2 Based on the request id in the header, deserialize the rest of the bytes into a request specific object (using the new java objects). 3. We will then be passing a header and an AbstractRequestResponse to KafkaApis. In order to do that, we will need to create similar request/response objects for internal requests such as StopReplica, LeaderAndIsr, UpdateMetadata, ControlledShutdown. Not sure whether they should be written in java or scala, but perhaps they should be only in the core project. Also note, there are some scala requests/responses used directly in SimpleConsumer. Since that's our public api, we can't remove those scala objects until the old consumer is phased out. We can remove the rest of the scala request objects. Thanks, Jun On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I'm starting this thread for the various questions I run into while refactoring the server to use client requests and responses. Help is appreciated :) First question: LEADER_AND_ISR request and STOP_REPLICA request are unimplemented in the client. Do we want to implement them as part of this refactoring? Or should we continue using the scala implementation for those? Gwen
[jira] [Commented] (KAFKA-1912) Create a simple request re-routing facility
[ https://issues.apache.org/jira/browse/KAFKA-1912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14367966#comment-14367966 ] Jay Kreps commented on KAFKA-1912: -- 1. I think this can be a single thread. The number of such requests is bounded by the number of connections since we process only one request at a time. In this respect it is not different from the purgatories, which can maintain an entry per connection and has a background thread. 2. Probably from the client's point of view a timeout is a timeout, no? 3. I think it is okay for this queue to be unbounded as the request objects will be small and there is an implicit bound from the number of connections (you can never have more than one request in flight for any connection). I agree if you tried to bound it that would add all sorts of complexity. 4. Not sure if I understand that comment. [~guozhang] can you explain a bit more? Create a simple request re-routing facility --- Key: KAFKA-1912 URL: https://issues.apache.org/jira/browse/KAFKA-1912 Project: Kafka Issue Type: Improvement Reporter: Jay Kreps Fix For: 0.8.3 We are accumulating a lot of requests that have to be directed to the correct server. This makes sense for high volume produce or fetch requests. But it is silly to put the extra burden on the client for the many miscellaneous requests such as fetching or committing offsets and so on. This adds a ton of practical complexity to the clients with little or no payoff in performance. We should add a generic request-type agnostic re-routing facility on the server. This would allow any server to accept a request and forward it to the correct destination, proxying the response back to the user. Naturally it needs to do this without blocking the thread. The result is that a client implementation can choose to be optimally efficient and manage a local cache of cluster state and attempt to always direct its requests to the proper server OR it can choose simplicity and just send things all to a single host and let that host figure out where to forward it. I actually think we should implement this more or less across the board, but some requests such as produce and fetch require more logic to proxy since they have to be scattered out to multiple servers and gathered back to create the response. So these could be done in a second phase. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] KIP-15 close(timeout) for producer
It looks we have another option and are now deciding between the following two interfaces: 1. Close() + close(timeout) - timeout could be either positive or zero. - only close(0) can be called from sender thread 2. Close() + abort() + close(timeout) - timeout can either be positive or zero - only abort() can be called from sender thread - abort() is equivalent to close(0) in 1) but does not join sender thread and does not close metrics. - Another thread has to call close() or close(timeout) in order to make sure the resources in producer are gone. The tow approach provides the same function we need, the difference is approach 2) follows convention of close() and abort(). On the other hand, approach 1) saves one interface compared with approach 2) but does not follow the convention. When the two approaches come to user code, it is probably something like this: Try { While(!finished) Producer.send(record, callback) } catch (Exception e) { Producer.close(5) } Class CallbackImpl implements Callback { onCompletion(RecordMetadata metadata Exception e) { If (e != null) Abort() / close() } } Because the two approach leads to almost the same user code, assuming users are always calling producer.close() as a clean up step, personally I prefer approach 2) as it follows convention. Any thoughts? Jiangjie (Becket) Qin On 3/17/15, 10:25 AM, Jiangjie Qin j...@linkedin.com wrote: Hi Jun, Yes, as Guozhang said, the main reason we set a flag is because close(0) is expected to be called by sender thread itself. If we want to maintain the semantic meaning of close(), one alternative is to have an abort() method does the same thing as close(0) except cleanup. And in close(timeout), after timeout we call abort() and join the sender thread. This was one of the previous proposal. We merged abort to close(0) because they are almost doing the same thing. But from what you mentioned, it might make sense to have two separate methods. Thanks. Jiangjie (Becket) Qin On 3/16/15, 10:31 PM, Guozhang Wang wangg...@gmail.com wrote: Yeah in this sense the sender thread will not exist immediately in the close(0) call, but will only terminate after the current response batch has been processed, as will the producer instance itself. There is a reason for this though: for a clean shutdown the caller thread has to wait for the sender thread to join before closing the producer instance, but this cannot be achieve if close(0) is called by the sender thread itself (for example in KAFKA-1659, there is a proposal from Andrew Stein on using thread.interrupt and thread.stop, but if it is called by the ioThread itself the stop call will fail). Hence we came up with the flag approach to let the sender thread to close as soon as it is at the barrier of the run loop. Guozhang On Mon, Mar 16, 2015 at 9:41 PM, Jun Rao j...@confluent.io wrote: Hmm, does that mean that after close(0), the sender thread is not necessary gone? Normally, after closing an entity, we expect all internal threads associated with the entity are shut down completely. Thanks, Jun On Mon, Mar 16, 2015 at 3:18 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jun, Close(0) will set two flags in sender. Running=false and a newly added forceClose=true. It will also set accumulator.closed=true so no further producer.send() will succeed. The sender thread will finish executing all the callbacks in current batch of responses, then it will see the forceClose flag. It will just fail all the incomplete batches in the producer and exit. So close(0) is a non-blocking call and sender thread will not try to join itself in close(0). Thanks. Jiangjie (Becket) Qin On 3/16/15, 2:50 PM, Jun Rao j...@confluent.io wrote: How does close(0) work if it's called from the sender thread? If close(0) needs to wait for the sender thread to join, wouldn't this cause a deadlock? Thanks, Jun On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Thanks Guozhang. It wouldn’t be as thoroughly considered without discussing with you :) Jiangjie (Becket) Qin On 3/16/15, 1:07 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks Jiangjie, After talking to you offline on this, I have been convinced and changed my preference to blocking. The immediate shutdown approach does have some unsafeness in some cases. Guozhang On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin j...@linkedin.com.invalid wrote: It looks that the problem we want to solve and the purpose we want to achieve is: If user uses close() in callback, we want to let user be aware that they should use close(0) instead of close() in the callback. We have agreed that we will have an error log to inform user about this mis-usage. The options differ in the way how we can force user to take a look at that error log.
Re: [DISCUSS] KIP-11- Authorization design for kafka security
Hi Parth, Thanks! A few questions: 1. Do you want to permit rules in your ACLs that DENY access as well as ALLOW? This can be handy setting up rules that have exceptions. E.g. “Allow principal P to READ resource R from all hosts” with “Deny principal P READ access to resource R from host H1” in combination would allow P to READ R from all hosts *except* H1. 2. When a topic is newly created, will there be an ACL created for it? If not, would that not deny subsequent access to it? (nit) Maybe use Principal instead of String to represent principals? On 3/9/15, 11:48 AM, Don Bosco Durai bo...@apache.org wrote: Parth Overall it is looking good. Couple of questionsŠ - Can you give an example how the policies will look like in the default implementation? - In the operations, can we support ³CONNECT² also? This can be used during Session connection - Regarding access control for ³Topic Creation², since we can¹t do it on the server side, can we de-scope it for? And plan it as a future feature request? Thanks Bosco On 3/6/15, 8:10 AM, Harsha ka...@harsha.io wrote: Hi Parth, Thanks for putting this together. Overall it looks good to me. Although AdminUtils is a concern KIP-4 can probably fix that part. Thanks, Harsha On Thu, Mar 5, 2015, at 10:39 AM, Parth Brahmbhatt wrote: Forgot to add links to wiki and jira. Link to wiki: https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization + Interface Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688 Thanks Parth From: Parth Brahmbhatt pbrahmbh...@hortonworks.commailto:pbrahmbh...@hortonworks.com Date: Thursday, March 5, 2015 at 10:33 AM To: dev@kafka.apache.orgmailto:dev@kafka.apache.org dev@kafka.apache.orgmailto:dev@kafka.apache.org Subject: [DISCUSS] KIP-11- Authorization design for kafka security Hi, KIP-11 is open for discussion , I have updated the wiki with the design and open questions. Thanks Parth