[
https://issues.apache.org/jira/browse/KAFKA-498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Neha Narkhede updated KAFKA-498:
--------------------------------
Attachment: kafka-498-v1.patch
1. Refactoring of current procedural controller code to functional style
1.1. Using switch case statements instead of if-else
1.2. Avoid explicit return statements from try-catch blocks
1.3. Use Option instead of null. This is very important, there were several
places that handled only the non-null case, basically making way for NPE's in
error cases.
1.4. Rename variables in all capital letters to camel case
1.5. Fixed logging since a lot of log statements were at info and were somewhat
unclear
1.6. Fixed indentation, some places uses 4 spaces, others used 2 spaces.
1.7. ZkUtils.scala
1.7.1 Refactored readDataMaybeNull to return an Option instead of null. This
allows all usages of that API that query an ephemeral path to handle the case
when the value in zookeeper does not exist anymore
1.7.2. Refactored getBrokerInfoFromIds to handle only a single broker id. The
reason is that most usages of that API used it to query for a single broker id.
1.8 Many places in the code did not wrap the lines correctly. Fixed this as
much as I could.
2. KafkaController.scala
2.1/ Renamed deliverLeaderAndISRFromZookeeper to readLeaderAndIsrFromZookeeper
2.2 There is a race condition in KafkaController where it doesn't synchronize
access to the controller's data structures while creating a new session.
Basically, controllerRegisterOrFailover is a private API that modifies almost
all the internal controller data structures that require synchronization. Since
this API is not synchronizing on the controller lock, all usages of this API
need to do this correctly. Fixed handleNewSession to synchronize the
controllerRegisterOrFailover API, since that can be concurrently executed with
the startup procedure.
2.3 In onBrokerChange, defaulting to empty list instead of null. This lets us
avoid null checks
2.4 Refactored onBrokerChange() API and moved the leader election logic to a
separate API. After starting on this path, I figured that this code is going to
need a major refactoring that I'd like to fix in a separate patch. Filed
KAFKA-499 to cover that. For now, we can keep this although it will look
complete only after KAFKA-499 is in.
2.5 onBrokerChange() and initLeaders() APIs are very similar and duplicate
quite a lot of code. I refactored onBrokerChange() but realized later that
initLeaders would have to refactored too. To keep the changes in this patch
small enough, I will fix this as part of KAFKA-499.
3. ControllerChannelManager & KafkaController
3.1 There are 3 types of information that the controller maintains per broker -
request thread, message queue and socket channel. Currently, they are
maintained in 3 separate variables and we need to ensure all 3 are synchronized
correctly. To simplify this, I created a case class to wrap this state in one
object.
3.2 There is a lock object whose purpose is to synchronize access to the broker
cache. Currently, the lock doesn't seem to protect all access to these caches,
which looks like a synchronization bug. Fixed this to have startup API
synchronize access to the broker cache.
3.3. The addBroker logic was duplicated in 2 places. Refactored the
constructors to add an auxilary constructor to call the addBroker API that
handles creating controller state info for a new broker and the appropriate
synchronization. Currently, the constructor duplicates code to add a new
broker, probably since it is not right to invoke an API from inside the primary
constructor
3.4 There are 2 ways to remove a broker from the controller's broker cache -
during shutdown of the channel manager or when a broker change listener fires.
Since removeBroker is a public API, it is synchronized using the brokerLock.
The shutdown API calls removeBroker internally and ends up acquiring and
releasing the lock multiple times. Ideally, it is sufficient to acquire the
brokerLock just once for the entire shutdown API. Refactored to move the common
removeBroker logic to a private API that doesn't synchronize on the brokerLock.
Changed removeBroker to acquire lock and call removeExistingBroker, changed
shutdown to acquire lock once and call removeExistingBroker.
4. Renamed LeaderAndISR to LeaderAndIsr
5. Renamed BrokerNotExistException to BrokerNotAvailableException to remain
consistent with other exceptions of the same type (LeaderNotAvailableException,
ReplicaNotAvailableException)
NOTE: Apache svn seems to hang at the time of uploading this patch. It will
apply cleanly on revision 1380945
> Controller code has race conditions and synchronization bugs
> ------------------------------------------------------------
>
> Key: KAFKA-498
> URL: https://issues.apache.org/jira/browse/KAFKA-498
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8
> Reporter: Neha Narkhede
> Assignee: Neha Narkhede
> Labels: bugs
> Attachments: kafka-498-v1.patch
>
> Original Estimate: 24h
> Remaining Estimate: 24h
>
> The controller maintains some internal data structures that are updated by
> state changes triggered by zookeeper listeners. There are race conditions in
> the controller channel manager and the controller state machine.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira