[
https://issues.apache.org/jira/browse/STORM-3090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stig Rohde Døssing resolved STORM-3090.
---
Resolution: Fixed
Fix Version/s: 1.2.3
Thanks [~choojoyq], merged to 1.x-branch. It doesn't cherry-pick cleanly onto
1.1.x/1.0.x, so please raise a new PR if you'd like the fix to go in those
versions.
For later reference, this is not merged to master, since we're planning to
delete storm-kafka there.
> The same offset value is used by the same partition number of different
> topics.
> ---
>
> Key: STORM-3090
> URL: https://issues.apache.org/jira/browse/STORM-3090
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka
>Affects Versions: 1.1.0, 1.0.4, 1.2.2
>Reporter: Nikita Gorbachevski
>Assignee: Nikita Gorbachevski
>Priority: Critical
> Labels: pull-request-available
> Fix For: 1.2.3
>
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> In the current implementation of `ZkCoordinator` deleted partition managers
> are used as state holders for newly created partition managers. This
> behaviour was introduced in the scope of
> [this|https://issues-test.apache.org/jira/browse/STORM-2296] ticket. However
> existing lookup is based on only on partition number.
> {code:java}
> Map deletedManagers = new HashMap<>();
> for (Partition id : deletedPartitions) {
> deletedManagers.put(id.partition, _managers.remove(id));
> }
> for (PartitionManager manager : deletedManagers.values()) {
> if (manager != null) manager.close();
> }
> LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition
> managers: " + newPartitions.toString());
> for (Partition id : newPartitions) {
> PartitionManager man = new PartitionManager(
> _connections,
> _topologyInstanceId,
> _state,
> _topoConf,
> _spoutConfig,
> id,
> deletedManagers.get(id.partition));
> _managers.put(id, man);
> {code}
> Which is definitely incorrect as the same task is able to manage multiple
> partitions with the same number but for different topics. In this case all
> new partition managers obtain the same offset value from a random deleted
> partition manager (as `HashMap` is used). And all fetch requests for the new
> partition managers fail with `TopicOffsetOutOfRangeException`. Some of them
> are recovered via this logic if assigned offset is smaller than the real one,
> but other continue to repetitively fail with offset out of range exception
> preventing fetching messages from Kafka.
> {code:java}
> if (offset > _emittedToOffset) {
> _lostMessageCount.incrBy(offset - _emittedToOffset);
> _emittedToOffset = offset;
> LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
> }
> {code}
> I assume that state holder lookup should be based both on topic and partition
> number.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)