[ https://issues.apache.org/jira/browse/KAFKA-350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Neha Narkhede updated KAFKA-350: -------------------------------- Attachment: kafka-350-v1.patch This patch contains fixes to bugs in various components to make message replication work in the presence of controlled server failures. The system test under system_test/single_host_multiple_brokers passes with server failures enabled. This patch contains the following changes - 1. Topic metadata request bug 1.1. While responding to a topic metadata request bug, the server uses the AdminUtils to query ZK for the host-port info for the leader and other replicas. However, it can happen that one of the brokers in the replica is offline and hence its broker registration in ZK is unavailable. Since the system test simulates exactly this scenario, the server's request handler thread was exiting due to a NoNodeException while reading the /brokers/ids/[id] path. The general problem seems to be handling error codes correctly in the request handlers and sending them to the client. I think once KAFKA-402 is resolved, all error codes will be handled cleanly by the server. For now, I've modified topic metadata response to have an error code per topic as well as per partition. So, if the leader is not available for some partitions, it will set the LeaderNotAvailable error code at the partition level. If some other replica is not available, it will set the ReplicaNotAvailable error code in the response instead of throwing an exception and exiting. 1.2. On the producer side, it fails the produce request retry if the leader for that partition is not available. It logs all other kinds of errors and ignores them. 1.3. Changed some unit tests in AdminTest to elect a leader before making topic metadata requests, so that the test passes. 1.4. Fixed another bug in the deserialization of the topic metadata response that read the versionId and errorCode in incorrect order. 1.5. In DefaultEventHandler, during a retry, it relied on the topic cache inside BrokerPartitionInfo for refreshing topic metadata. However, if the topic cache is empty (when no previous send request has succeeded), it doesn't end up refreshing the metadata for the required topics in the produce request. Fixed this by passing the list of topics explicitly in the call to updateInfo() 1.6. In general, it seems like a good idea to have a global error code for the entire response (to handle global errors like illegal request format), then a per topic error code (to handle error codes like UnknownTopic), a per-partition error code (to handle partition-level error codes like LeaderNotAvailable) and also a per-replica error code (to handle ReplicaNotAvailable). Jun had a good suggestion about the format of TopicMetadata response. I would like to file another JIRA to improve the format of topic metadata request. 2. Bug in TestUtils waitUntilLeaderIsElected was signalling a condition object without acquiring the corresponding lock. This error message was probably getting masked since we had turned off ERROR log4j level for unit tests. Fixed this. 3. Log recovery 3.1. Removed the highWatermark() API in Log.scala, since we used “highwatermark” to indicate the offset of the latest message flushed to disk. This was causing the server to prevent the replicas from fetching unflushed data on the leader. With replication, we use highwatermark to denote the offset of the latest committed message. I removed all references to the older API (highWatermark). To remain consistent with setHW, I added an API called getHW. As I understand it, these APIs will be refactored/renamed to match conventions, when KAFKA354 is resolved 3.2. To handle truncating a log segment, I added a truncateUpto API that takes in the checkpointed highwatermark value for that partition, computes the correct end offset for that log segment and truncates data after the computed end offset 3.3. Improved log recovery to delete segments that have start offset > highwatermark 3.4. Fixed logEndOffset to return the absolute offset of the last message in the log for that partition. 3.5. To limit the changes in this patch, it does not move the highwatermarks for all partitions to a single file. This will be done as part of KAFKA-405 3.6. Added a LogRecoveryTest to test recovery of log segments with and without failures 4. Config options 4.1. We might want to revisit the defaults for all config options. For example, the isr.keep.in.sync.time.ms defaulted to 30s which seems way too optimistic. While running the system tests, most messages timed out since the isr.keep.in.sync.time.ms and the frequency of bouncing the replicas was also roughly 30s. The socket timeout for the producer also defaults to 30s which seems very long to block a producer for. 4.2. The socket.timeout.ms should probably be set to producer.request.ack.timeout.ms, if it is a non-negative value. If producer.request.ack.timeout.ms = -1, socket.timeout.ms should probably default to a meaningful value. While running the system test, I observed that the socket timeout was 30s and the producer.request.ack.timeout.ms was -1, which means that the producer would always block for 30s if the server failed to send back an ACK. Since the frequency of bouncing the brokers was also 30s, most produce requests were timing out. 5. Socket server bugs 5.1. A bug in processNewResponses() causes the SocketServer to process responses inspite of it going through a shutdown. It probably makes sense to let outstanding requests timeout and shutdown immediately 5.2. Another bug in processNewResponses() causes it to go in an infinite loop when the selection key processing throws an exception. It failed to move to the next key in this case. I fixed it by moving the next key processing in the finally block. 6. Moved the zookeeper client connection from startup() API in KafkaZookeeper to startup() API in KafkaServer.scala. This is because the ReplicaManager is instantiated right after KafkaZookeeper and was passed in the zkclient object created by KafkaZookeeper. Since KafkaZookeeper started the zkclient only in startup() API, ReplicaManager's API's got NPE while trying to access the passed-in zkclient. With the fix, we can create the zookeeper client connection once in the KafkaServer startup, pass it around and tear it down in the shutdown API of KafkaServer. 7. System testing 7.1. Hacked ProducerPerformance to turn off forceful use of the async producer. I guess ProducerPerformance has grown over time into a complex blob of if-else statements. Will file another JIRA to refactor it and fix it so that all config options work well with both sync and async producer. 7.2. Added producer ACK config options to ProducerPerformance. Right now, they are hardcoded. I'm hoping this can be fixed with the above JIRA. 7.3. The single_host_multiple_brokers system test needs to be changed after KAFKA-353. Basically, it needs to count the successfully sent messages correctly. Right now, there is no good way to do this in the script. One way is to have the system test script grep the logs to find the successfully ACKed producer requests. To get it working for now, hacked it to use sync producer. Hence, before these issues are fixed it will be pretty slow. 7.4. Improved the system test to start the console consumer at the very end of the test for verification 8. Another thing that can be fixed in KAFKA-402 is the following - 8.1. When a broker gets a request for a partition for which it is not the leader, it should sent back a response with an error code immediately. Right now, I see a produce request timing out in LogRecoveryTest since the producer never gets a response. It doesn't break the test since the producer retries, but it is adding unnecessary delay. 8.2. In quite a few places, we log an error and then retry, or log an error which is actually meant to be a warning. Due to this, it is hard to spot real errors during testing. We can probably fix this too as part of KAFKA-402. 8. Found a NPE while running ReplicaFetchTest. Would like to file a bug for it 9. Controller bugs 9.1. Unit tests fail intermittently due to NoSuchElementException thrown by testZKSendWithDeadBroker() in KafkaController. Due to this, the zookeeper server doesn't shut down and rest of the unit tests fail. The root cause is absence of a broker id as a key in the java map. I think Scala maps should be used as it forces the user to handle invalid values cleanly through Option variables. 9.2. ControllerBasicTest tests throw tons of zookeeper warnings that complain a client not cleanly closing a zookeeper session. The sessions are forcefully closed only when the zookeeper server is shutdown. These warnings should be paid attention to and fixed. LogTest is also throwing similar zk warnings. 9.3. Since controller bugs were not in the critical path to getting replication-with-failures to work, I'd like to file another JIRA to fix it. If the above future work sounds good, I will go ahead and file the JIRAs > Enable message replication in the presence of controlled failures > ----------------------------------------------------------------- > > Key: KAFKA-350 > URL: https://issues.apache.org/jira/browse/KAFKA-350 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8 > Reporter: Neha Narkhede > Assignee: Neha Narkhede > Attachments: kafka-350-v1.patch > > > KAFKA-46 introduced message replication feature in the absence of server > failures. This JIRA will improve the log recovery logic and fix other bugs to > enable message replication to happen in the presence of controlled server > failures -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira