[
https://issues.apache.org/jira/browse/KAFKA-350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Neha Narkhede updated KAFKA-350:
--------------------------------
Attachment: kafka-350-v1.patch
This patch contains fixes to bugs in various components to make message
replication work in the presence of controlled server failures. The system test
under system_test/single_host_multiple_brokers passes with server failures
enabled.
This patch contains the following changes -
1. Topic metadata request bug
1.1. While responding to a topic metadata request bug, the server uses the
AdminUtils to query ZK for the host-port info for the leader and other
replicas. However, it can happen that one of the brokers in the replica is
offline and hence its broker registration in ZK is unavailable. Since the
system test simulates exactly this scenario, the server's request handler
thread was exiting due to a NoNodeException while reading the /brokers/ids/[id]
path. The general problem seems to be handling error codes correctly in the
request handlers and sending them to the client. I think once KAFKA-402 is
resolved, all error codes will be handled cleanly by the server. For now, I've
modified topic metadata response to have an error code per topic as well as per
partition. So, if the leader is not available for some partitions, it will set
the LeaderNotAvailable error code at the partition level. If some other replica
is not available, it will set the ReplicaNotAvailable error code in the
response instead of throwing an exception and exiting.
1.2. On the producer side, it fails the produce request retry if the leader for
that partition is not available. It logs all other kinds of errors and ignores
them.
1.3. Changed some unit tests in AdminTest to elect a leader before making topic
metadata requests, so that the test passes.
1.4. Fixed another bug in the deserialization of the topic metadata response
that read the versionId and errorCode in incorrect order.
1.5. In DefaultEventHandler, during a retry, it relied on the topic cache
inside BrokerPartitionInfo for refreshing topic metadata. However, if the topic
cache is empty (when no previous send request has succeeded), it doesn't end up
refreshing the metadata for the required topics in the produce request. Fixed
this by passing the list of topics explicitly in the call to updateInfo()
1.6. In general, it seems like a good idea to have a global error code for the
entire response (to handle global errors like illegal request format), then a
per topic error code (to handle error codes like UnknownTopic), a per-partition
error code (to handle partition-level error codes like LeaderNotAvailable) and
also a per-replica error code (to handle ReplicaNotAvailable). Jun had a good
suggestion about the format of TopicMetadata response. I would like to file
another JIRA to improve the format of topic metadata request.
2. Bug in TestUtils waitUntilLeaderIsElected was signalling a condition object
without acquiring the corresponding lock. This error message was probably
getting masked since we had turned off ERROR log4j level for unit tests. Fixed
this.
3. Log recovery
3.1. Removed the highWatermark() API in Log.scala, since we used
“highwatermark” to indicate the offset of the latest message flushed to disk.
This was causing the server to prevent the replicas from fetching unflushed
data on the leader. With replication, we use highwatermark to denote the offset
of the latest committed message. I removed all references to the older API
(highWatermark). To remain consistent with setHW, I added an API called getHW.
As I understand it, these APIs will be refactored/renamed to match conventions,
when KAFKA354 is resolved
3.2. To handle truncating a log segment, I added a truncateUpto API that takes
in the checkpointed highwatermark value for that partition, computes the
correct end offset for that log segment and truncates data after the computed
end offset
3.3. Improved log recovery to delete segments that have start offset >
highwatermark
3.4. Fixed logEndOffset to return the absolute offset of the last message in
the log for that partition.
3.5. To limit the changes in this patch, it does not move the highwatermarks
for all partitions to a single file. This will be done as part of KAFKA-405
3.6. Added a LogRecoveryTest to test recovery of log segments with and without
failures
4. Config options
4.1. We might want to revisit the defaults for all config options. For example,
the isr.keep.in.sync.time.ms defaulted to 30s which seems way too optimistic.
While running the system tests, most messages timed out since the
isr.keep.in.sync.time.ms and the frequency of bouncing the replicas was also
roughly 30s. The socket timeout for the producer also defaults to 30s which
seems very long to block a producer for.
4.2. The socket.timeout.ms should probably be set to
producer.request.ack.timeout.ms, if it is a non-negative value. If
producer.request.ack.timeout.ms = -1, socket.timeout.ms should probably
default to a meaningful value. While running the system test, I observed that
the socket timeout was 30s and the producer.request.ack.timeout.ms was -1,
which means that the producer would always block for 30s if the server failed
to send back an ACK. Since the frequency of bouncing the brokers was also 30s,
most produce requests were timing out.
5. Socket server bugs
5.1. A bug in processNewResponses() causes the SocketServer to process
responses inspite of it going through a shutdown. It probably makes sense to
let outstanding requests timeout and shutdown immediately
5.2. Another bug in processNewResponses() causes it to go in an infinite loop
when the selection key processing throws an exception. It failed to move to the
next key in this case. I fixed it by moving the next key processing in the
finally block.
6. Moved the zookeeper client connection from startup() API in KafkaZookeeper
to startup() API in KafkaServer.scala. This is because the ReplicaManager is
instantiated right after KafkaZookeeper and was passed in the zkclient object
created by KafkaZookeeper. Since KafkaZookeeper started the zkclient only in
startup() API, ReplicaManager's API's got NPE while trying to access the
passed-in zkclient. With the fix, we can create the zookeeper client connection
once in the KafkaServer startup, pass it around and tear it down in the
shutdown API of KafkaServer.
7. System testing
7.1. Hacked ProducerPerformance to turn off forceful use of the async producer.
I guess ProducerPerformance has grown over time into a complex blob of if-else
statements. Will file another JIRA to refactor it and fix it so that all config
options work well with both sync and async producer.
7.2. Added producer ACK config options to ProducerPerformance. Right now, they
are hardcoded. I'm hoping this can be fixed with the above JIRA.
7.3. The single_host_multiple_brokers system test needs to be changed after
KAFKA-353. Basically, it needs to count the successfully sent messages
correctly. Right now, there is no good way to do this in the script. One way is
to have the system test script grep the logs to find the successfully ACKed
producer requests. To get it working for now, hacked it to use sync producer.
Hence, before these issues are fixed it will be pretty slow.
7.4. Improved the system test to start the console consumer at the very end of
the test for verification
8. Another thing that can be fixed in KAFKA-402 is the following -
8.1. When a broker gets a request for a partition for which it is not the
leader, it should sent back a response with an error code immediately. Right
now, I see a produce request timing out in LogRecoveryTest since the producer
never gets a response. It doesn't break the test since the producer retries,
but it is adding unnecessary delay.
8.2. In quite a few places, we log an error and then retry, or log an error
which is actually meant to be a warning. Due to this, it is hard to spot real
errors during testing. We can probably fix this too as part of KAFKA-402.
8. Found a NPE while running ReplicaFetchTest. Would like to file a bug for it
9. Controller bugs
9.1. Unit tests fail intermittently due to NoSuchElementException thrown by
testZKSendWithDeadBroker() in KafkaController. Due to this, the zookeeper
server doesn't shut down and rest of the unit tests fail. The root cause is
absence of a broker id as a key in the java map. I think Scala maps should be
used as it forces the user to handle invalid values cleanly through Option
variables.
9.2. ControllerBasicTest tests throw tons of zookeeper warnings that complain a
client not cleanly closing a zookeeper session. The sessions are forcefully
closed only when the zookeeper server is shutdown. These warnings should be
paid attention to and fixed. LogTest is also throwing similar zk warnings.
9.3. Since controller bugs were not in the critical path to getting
replication-with-failures to work, I'd like to file another JIRA to fix it.
If the above future work sounds good, I will go ahead and file the JIRAs
> Enable message replication in the presence of controlled failures
> -----------------------------------------------------------------
>
> Key: KAFKA-350
> URL: https://issues.apache.org/jira/browse/KAFKA-350
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8
> Reporter: Neha Narkhede
> Assignee: Neha Narkhede
> Attachments: kafka-350-v1.patch
>
>
> KAFKA-46 introduced message replication feature in the absence of server
> failures. This JIRA will improve the log recovery logic and fix other bugs to
> enable message replication to happen in the presence of controlled server
> failures
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira