[ https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13472753#comment-13472753 ]
Neha Narkhede edited comment on KAFKA-506 at 10/10/12 12:58 AM: ---------------------------------------------------------------- I have to mention that there is a possibility that some of my comments are not related to this patch directly, but were found while inspecting the new code closely :) Since you know the code better, feel free to file follow up JIRAs 1. Log 1.2 In findRange, the following statements runs the risk of hitting overflow, giving incorrect results from the binary search - val mid = ceil((high + low) / 2.0).toInt Will probably be better to use val mid = low + ceil((high - low)/2.0).toInt 1.3 It seems that there are only 2 usages of the findRange API that takes in the array length . We already have an API that covers that use case - findRange[T <: Range](ranges: Array[T], value: Long) and this is used by a majority of API calls. We can make the findRange method that has the actual binary search logic private and changes the 2 use cases in Log.scala to use the public method that assumes the array length. 1.4 In truncateTo, it is possible that the log file was successfully deleted but the index file was not. In this case, we would end up an unused index file that is never deleted from the kafka log directory. 1.5 In loadSegments, we need to rebuild any missing index files. Or it will error out at a later time. Do we have a follow up JIRA to cover this, it seems like a blocker to me. 2. LogManager 2.1 numPartitions is an unused class variable 3. FileMessageSet 3.1. In searchFor API, fix comment to mention that it searches for the first/least offset that is >= the target offset. Right now it says search for the last offset that is >= target offset 3.2 The searchFor API returns a pair of (offset, position). Right now, it does not always return the offset of the message at the returned position. If the file message set is sparse, it returns the offset of the next message, so the offset and position do not point to the same message in the log. Currently, we are not using the offset returned by the read() API, but in the future if we do, it will be good for it to be consistent. 3.3 In searchFor API, one of the statements uses 12 and the other uses MessageSet.LogOverhead. I think the while condition is better understood if it said MessageSet.LogOverhead. 4. LogSegment 4.1 It is better to make translateOffset return an Option. That way, every usage of this API will be forced to handle the case when the position was not found in the log segment. 4.2 I guess it might make sense to have all the places that uses this segment size to a an Int instead of Long. 5. ConsumerIterator Right now, while committing offsets for a compressed message set, the consumer can still get duplicates. However, we could probably fix this by making the ConsumerIterator smarter and discarding messages with offset < fetch offset. 6. ReplicaFetcherThread When the follower fetches data from the leader, it uses log.append which re-computes the logical message ids. This involves recompression when the data is compressed, which it is in production. This can be avoided by making the data copy from leader -> follower smarter 7. MessageCompressionTest There are 2 unused imports in this file 8. ByteBufferMessageSet 8.1 There are 3 unused imports in this file 8.2 The return statement in create() API is redundant 9. OffsetIndex 9.1 The last return statement in indexSlotFor is redundant 9.1 The first return statement in indexSlotFor can be safely removed by using case-match or putting the rest of the logic in the else part of if-else block. 10. Performance Performance test to see the impact on throughput/latency if any due to this patch. What I am curious about is the performance impact due to the following, which are the changes that can impact performance as compared to pre KAFKA-506 - 10.1 Recompression of data during replica reads 10.2 Recompression of data to assign correct offsets inside a compressed message set 10.3 The linear search in the file segment to find the message with a given id. This depends on the index interval and there needs to be a balance between index size and index interval. 10.4 The impact of making the log memory mapped. 10.5 Overhead of using the index to read/write data in Kafka 11. KafkaApis Unused imports in this file Just to summarize so that we understand the follow up work and also the JIRAs that got automatically resolved due to this feature. Please correct me if I missed something here - Follow up JIRAs 1. Retain key in producer (KAFKA-544) 2. Change sizeInBytes() to Int (KAFKA-556) 3. Fix consumer offset commit in ConsumerIterator for compressed message sets (KAFKA-546) 4. Remove the recompression involved while fetching data from follower to leader (KAFKA-557) 5. Rebuild missing index files (KAFKA-561) 6. Add performance test for log subsystem (KAFKA-545) 7. Overall Performance analysis due to the factors listed above JIRAs resolved due to this feature 1. Fix offsets returned as part of producer response (KAFKA-511) 2. Consumer offset issue during unclean leader election (KAFKA-497) was (Author: nehanarkhede): I have to mention that there is a possibility that some of my comments are not related to this patch directly, but were found while inspecting the new code closely :) Since you know the code better, feel free to file follow up JIRAs 1. Log 1.2 In findRange, the following statements runs the risk of hitting overflow, giving incorrect results from the binary search - val mid = ceil((high + low) / 2.0).toInt Will probably be better to use val mid = low + ceil((high - low)/2.0).toInt 1.3 It seems that there are only 2 usages of the findRange API that takes in the array length . We already have an API that covers that use case - findRange[T <: Range](ranges: Array[T], value: Long) and this is used by a majority of API calls. We can make the findRange method that has the actual binary search logic private and changes the 2 use cases in Log.scala to use the public method that assumes the array length. 1.4 In truncateTo, it is possible that the log file was successfully deleted but the index file was not. In this case, we would end up an unused index file that is never deleted from the kafka log directory. 1.5 In loadSegments, we need to rebuild any missing index files. Or it will error out at a later time. Do we have a follow up JIRA to cover this, it seems like a blocker to me. 2. LogManager 2.1 numPartitions is an unused class variable 3. FileMessageSet 3.1. In searchFor API, fix comment to mention that it searches for the first/least offset that is >= the target offset. Right now it says search for the last offset that is >= target offset 3.2 The searchFor API returns a pair of (offset, position). Right now, it does not always return the offset of the message at the returned position. If the file message set is sparse, it returns the offset of the next message, so the offset and position do not point to the same message in the log. Currently, we are not using the offset returned by the read() API, but in the future if we do, it will be good for it to be consistent. 3.3 In searchFor API, one of the statements uses 12 and the other uses MessageSet.LogOverhead. I think the while condition is better understood if it said MessageSet.LogOverhead. 4. LogSegment 4.1 It is better to make translateOffset return an Option. That way, every usage of this API will be forced to handle the case when the position was not found in the log segment. 4.2 I guess it might make sense to have all the places that uses this segment size to a an Int instead of Long. 5. ConsumerIterator Right now, while committing offsets for a compressed message set, the consumer can still get duplicates. However, we could probably fix this by making the ConsumerIterator smarter and discarding messages with offset < fetch offset. 6. ReplicaFetcherThread When the follower fetches data from the leader, it uses log.append which re-computes the logical message ids. This involves recompression when the data is compressed, which it is in production. This can be avoided by making the data copy from leader -> follower smarter 7. MessageCompressionTest There are 2 unused imports in this file 8. ByteBufferMessageSet 8.1 There are 3 unused imports in this file 8.2 The return statement in create() API is redundant 9. OffsetIndex 9.1 The last return statement in indexSlotFor is redundant 9.1 The first return statement in indexSlotFor can be safely removed by using case-match or putting the rest of the logic in the else part of if-else block. 10. Performance Performance test to see the impact on throughput/latency if any due to this patch. What I am curious about is the performance impact due to the following, which are the changes that can impact performance as compared to pre KAFKA-506 - 10.1 Recompression of data during replica reads 10.2 Recompression of data to assign correct offsets inside a compressed message set 10.3 The linear search in the file segment to find the message with a given id. This depends on the index interval and there needs to be a balance between index size and index interval. 10.4 The impact of making the log memory mapped. 10.5 Overhead of using the index to read/write data in Kafka Just to summarize so that we understand the follow up work and also the JIRAs that got automatically resolved due to this feature. Please correct me if I missed something here - Follow up JIRAs 1. Retain key in producer (KAFKA-544) 2. Change sizeInBytes() to Int (KAFKA-556) 3. Fix consumer offset commit in ConsumerIterator for compressed message sets (KAFKA-546) 4. Remove the recompression involved while fetching data from follower to leader (KAFKA-557) 5. Rebuild missing index files (JIRA to be filed) > Store logical offset in log > --------------------------- > > Key: KAFKA-506 > URL: https://issues.apache.org/jira/browse/KAFKA-506 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.8 > Reporter: Jay Kreps > Assignee: Jay Kreps > Fix For: 0.8 > > Attachments: KAFKA-506-phase-2.patch, KAFKA-506-phase-2-v1.patch, > KAFKA-506-phase-2-v2.patch, KAFKA-506-phase-2-v3.patch, > KAFKA-506-phase-2-v4.patch, KAFKA-506-phase-2-v5.patch, > KAFKA-506-phase-2-v5.patch, KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, > KAFKA-506-v4-changes-since-v3.patch > > > Currently we only support retention by dropping entire segment files. A more > nuanced retention policy would allow dropping individual messages from a > segment file by recopying it. This is not currently possible because the > lookup structure we use to locate messages is based on the file offset > directly. > To fix this we should move to a sequential, logical offset (0,1,2,3,...) > which would allow deleting individual messages (e.g. 2) without deleting the > entire segment. > It is desirable to make this change in the 0.8 timeframe since we are already > doing data format changes. > As part of this we would explicitly store the key field given by the producer > for partitioning (right now there is no way for the consumer to find the > value used for partitioning). > This combination of features would allow a key-based retention policy that > would clean obsolete values either by a user defined key. > The specific use case I am targeting is a commit log for local state > maintained by a process doing some kind of near-real-time processing. The > process could log out its local state changes and be able to restore from > this log in the event of a failure. However I think this is a broadly useful > feature. > The following changes would be part of this: > 1. The log format would now be > 8 byte offset > 4 byte message_size > N byte message > 2. The offsets would be changed to a sequential, logical number rather than > the byte offset (e.g. 0,1,2,3,...) > 3. A local memory-mapped lookup structure will be kept for each log segment > that contains the mapping from logical to physical offset. > I propose to break this into two patches. The first makes the log format > changes, but retains the physical offset. The second adds the lookup > structure and moves to logical offset. > Here are a few issues to be considered for the first patch: > 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One > surprising thing is that the offset is actually the offset of the next > message. I think there are actually several uses for the current offset. I > would propose making this hold the current message offset since with logical > offsets the next offset is always just current_offset+1. Note that since we > no longer require messages to be dense, it is not true that if the next > offset is N the current offset is N-1 (because N-1 may have been deleted). > Thoughts or objections? > 2. Currently during iteration over a ByteBufferMessageSet we throw an > exception if there are zero messages in the set. This is used to detect > fetches that are smaller than a single message size. I think this behavior is > misplaced and should be moved up into the consumer. > 3. In addition to adding a key in Message, I made two other changes: (1) I > moved the CRC to the first field and made it cover the entire message > contents (previously it only covered the payload), (2) I dropped support for > Magic=0, effectively making the attributes field required, which simplifies > the code (since we are breaking compatibility anyway). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira