[ 
https://issues.apache.org/jira/browse/KAFKA-350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-350:
--------------------------------

    Attachment: kafka-350-v1.patch

This patch contains fixes to bugs in various components to make message 
replication work in the presence of controlled server failures. The system test 
under system_test/single_host_multiple_brokers passes with server failures 
enabled. 

This patch contains the following changes -

1. Topic metadata request bug

1.1. While responding to a topic metadata request bug, the server uses the 
AdminUtils to query ZK for the host-port info for the leader and other 
replicas. However, it can happen that one of the brokers in the replica is 
offline and hence its broker registration in ZK is unavailable. Since the 
system test simulates exactly this scenario, the server's request handler 
thread was exiting due to a NoNodeException while reading the /brokers/ids/[id] 
path. The general problem seems to be handling error codes correctly in the 
request handlers and sending them to the client. I think once KAFKA-402 is 
resolved, all error codes will be handled cleanly by the server. For now, I've 
modified topic metadata response to have an error code per topic as well as per 
partition. So, if the leader is not available for some partitions, it will set 
the LeaderNotAvailable error code at the partition level. If some other replica 
is not available, it will set the ReplicaNotAvailable error code in the 
response instead of throwing an exception and exiting.
1.2. On the producer side, it fails the produce request retry if the leader for 
that partition is not available. It logs all other kinds of errors and ignores 
them. 
1.3. Changed some unit tests in AdminTest to elect a leader before making topic 
metadata requests, so that the test passes.
1.4. Fixed another bug in the deserialization of the topic metadata response 
that read the versionId and errorCode in incorrect order.
1.5. In DefaultEventHandler, during a retry, it relied on the topic cache 
inside BrokerPartitionInfo for refreshing topic metadata. However, if the topic 
cache is empty (when no previous send request has succeeded), it doesn't end up 
refreshing the metadata for the required topics in the produce request. Fixed 
this by passing the list of topics explicitly in the call to updateInfo()
1.6. In general, it seems like a good idea to have a global error code for the 
entire response (to handle global errors like illegal request format), then a 
per topic error code (to handle error codes like UnknownTopic), a per-partition 
error code (to handle partition-level error codes like LeaderNotAvailable) and 
also a per-replica error code (to handle ReplicaNotAvailable). Jun had a good 
suggestion about the format of TopicMetadata response. I would like to file 
another JIRA to improve the format of topic metadata request.

2. Bug in TestUtils waitUntilLeaderIsElected was signalling a condition object 
without acquiring the corresponding lock. This error message was probably 
getting masked since we had turned off ERROR log4j level for unit tests. Fixed 
this.

3. Log recovery
3.1. Removed the highWatermark() API in Log.scala, since we used 
“highwatermark” to indicate the offset of the latest message flushed to disk. 
This was causing the server to prevent the replicas from fetching unflushed 
data on the leader. With replication, we use highwatermark to denote the offset 
of the latest committed message. I removed all references to the older API 
(highWatermark). To remain consistent with setHW, I added an API called getHW. 
As I understand it, these APIs will be refactored/renamed to match conventions, 
when KAFKA354 is resolved
3.2. To handle truncating a log segment, I added a truncateUpto API that takes 
in the checkpointed highwatermark value for that partition,  computes the 
correct end offset for that log segment and truncates data after the computed 
end offset
3.3. Improved log recovery to delete segments that have start offset > 
highwatermark
3.4. Fixed logEndOffset to return the absolute offset of the last message in 
the log for that partition.
3.5. To limit the changes in this patch, it does not move the highwatermarks 
for all partitions to a single file. This will be done as part of KAFKA-405
3.6. Added a LogRecoveryTest to test recovery of log segments with and without 
failures

4. Config options
4.1. We might want to revisit the defaults for all config options. For example, 
the isr.keep.in.sync.time.ms defaulted to 30s which seems way too optimistic.  
While running the system tests, most messages timed out since the 
isr.keep.in.sync.time.ms and the frequency of bouncing the replicas was also 
roughly 30s. The socket timeout for the producer also defaults to 30s which 
seems very long to block a producer for.
4.2.  The socket.timeout.ms should probably be set to 
producer.request.ack.timeout.ms, if it is a non-negative value. If 
producer.request.ack.timeout.ms  = -1, socket.timeout.ms should probably 
default to a meaningful value. While running the system test, I observed that 
the socket timeout was 30s and the producer.request.ack.timeout.ms was -1, 
which means that the producer would always block for 30s if the server failed 
to send back an ACK. Since the frequency of bouncing the brokers was also 30s, 
most produce requests were timing out.

5. Socket server bugs
5.1. A bug in processNewResponses() causes the SocketServer to process 
responses inspite of it going through a shutdown. It probably makes sense to 
let outstanding requests timeout and shutdown immediately
5.2. Another bug in processNewResponses() causes it to go in an infinite loop 
when the selection key processing throws an exception. It failed to move to the 
next key in this case. I fixed it by moving the next key processing in the 
finally block.

6. Moved the zookeeper client connection from startup() API in KafkaZookeeper 
to startup() API  in KafkaServer.scala. This is because the ReplicaManager is 
instantiated right after KafkaZookeeper and was passed in the zkclient object 
created by KafkaZookeeper. Since KafkaZookeeper started the zkclient only in 
startup() API, ReplicaManager's API's got NPE while trying to access the 
passed-in zkclient. With the fix, we can create the zookeeper client connection 
once in the KafkaServer startup, pass it around and tear it down in the 
shutdown API of KafkaServer. 

7. System testing
7.1. Hacked ProducerPerformance to turn off forceful use of the async producer. 
I guess ProducerPerformance has grown over time into a complex blob of if-else 
statements. Will file another JIRA to refactor it and fix it so that all config 
options work well with both sync and async producer.
7.2. Added producer ACK config options to ProducerPerformance. Right now, they 
are hardcoded. I'm hoping this can be fixed with the above JIRA. 
7.3. The single_host_multiple_brokers system test needs to be changed after 
KAFKA-353. Basically, it needs to count the successfully sent messages 
correctly. Right now, there is no good way to do this in the script. One way is 
to have the system test script grep the logs to find the successfully ACKed 
producer requests. To get it working for now, hacked it to use sync producer. 
Hence, before these issues are fixed it will be pretty slow.
7.4. Improved the system test to start the console consumer at the very end of 
the test for verification

8. Another thing that can be fixed in KAFKA-402 is the following -
8.1. When a broker gets a request for a partition for which it is not the 
leader, it should sent back a response with an error code immediately. Right 
now, I see a produce request timing out in LogRecoveryTest since the producer 
never gets a response. It doesn't break the test since the producer retries, 
but it is adding unnecessary delay.
8.2. In quite a few places, we log an error and then retry, or log an error 
which is actually meant to be a warning. Due to this, it is hard to spot real 
errors during testing. We can probably fix this too as part of KAFKA-402. 

8. Found a NPE while running ReplicaFetchTest. Would like to file a bug for it

9. Controller bugs 
9.1. Unit tests fail intermittently due to NoSuchElementException thrown by 
testZKSendWithDeadBroker() in KafkaController. Due to this, the zookeeper 
server doesn't shut down and rest of the unit tests fail. The root cause is 
absence of a broker id as a key in the java map. I think Scala maps should be 
used as it forces the user to handle invalid values cleanly through Option 
variables.
9.2. ControllerBasicTest tests throw tons of zookeeper warnings that complain a 
client not cleanly closing a zookeeper session. The sessions are forcefully 
closed only when the zookeeper server is shutdown. These warnings should be 
paid attention to and fixed. LogTest is also throwing similar zk warnings.
9.3. Since controller bugs were not in the critical path to getting 
replication-with-failures to work, I'd like to file another JIRA to fix it.

If the above future work sounds good, I will go ahead and file the JIRAs
                
> Enable message replication in the presence of controlled failures
> -----------------------------------------------------------------
>
>                 Key: KAFKA-350
>                 URL: https://issues.apache.org/jira/browse/KAFKA-350
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>         Attachments: kafka-350-v1.patch
>
>
> KAFKA-46 introduced message replication feature in the absence of server 
> failures. This JIRA will improve the log recovery logic and fix other bugs to 
> enable message replication to happen in the presence of controlled server 
> failures

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira


Reply via email to