[jira] [Resolved] (STORM-3090) The same offset value is used by the same partition number of different topics.

2018-07-09 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/STORM-3090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing resolved STORM-3090.
---
   Resolution: Fixed
Fix Version/s: 1.1.4

> The same offset value is used by the same partition number of different 
> topics.
> ---
>
> Key: STORM-3090
> URL: https://issues.apache.org/jira/browse/STORM-3090
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 1.1.0, 1.0.4, 1.2.2
>Reporter: Nikita Gorbachevski
>Assignee: Nikita Gorbachevski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.2.3, 1.1.4
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> In the current implementation of `ZkCoordinator` deleted partition managers 
> are used as state holders for newly created partition managers. This 
> behaviour was introduced in the scope of 
> [this|https://issues-test.apache.org/jira/browse/STORM-2296] ticket. However 
> existing lookup is based on only on partition number.
> {code:java}
> Map deletedManagers = new HashMap<>();
> for (Partition id : deletedPartitions) {
>  deletedManagers.put(id.partition, _managers.remove(id));
> }
> for (PartitionManager manager : deletedManagers.values()) {
>  if (manager != null) manager.close();
> }
> LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition 
> managers: " + newPartitions.toString());
> for (Partition id : newPartitions) {
>  PartitionManager man = new PartitionManager(
>  _connections,
>  _topologyInstanceId,
>  _state,
>  _topoConf,
>  _spoutConfig,
>  id,
>  deletedManagers.get(id.partition));
>  _managers.put(id, man);
> {code}
> Which is definitely incorrect as the same task is able to manage multiple 
> partitions with the same number but for different topics. In this case all 
> new partition managers obtain the same offset value from a random deleted 
> partition manager (as `HashMap` is used). And all fetch requests for the new 
> partition managers fail with `TopicOffsetOutOfRangeException`. Some of them 
> are recovered via this logic if assigned offset is smaller than the real one, 
> but other continue to repetitively fail with offset out of range exception 
> preventing fetching messages from Kafka.
> {code:java}
> if (offset > _emittedToOffset) {
>  _lostMessageCount.incrBy(offset - _emittedToOffset);
>  _emittedToOffset = offset;
>  LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
> }
> {code}
> I assume that state holder lookup should be based both on topic and partition 
> number.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-3090) The same offset value is used by the same partition number of different topics.

2018-07-06 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/STORM-3090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing resolved STORM-3090.
---
   Resolution: Fixed
Fix Version/s: 1.2.3

Thanks [~choojoyq], merged to 1.x-branch. It doesn't cherry-pick cleanly onto 
1.1.x/1.0.x, so please raise a new PR if you'd like the fix to go in those 
versions.

For later reference, this is not merged to master, since we're planning to 
delete storm-kafka there.

> The same offset value is used by the same partition number of different 
> topics.
> ---
>
> Key: STORM-3090
> URL: https://issues.apache.org/jira/browse/STORM-3090
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 1.1.0, 1.0.4, 1.2.2
>Reporter: Nikita Gorbachevski
>Assignee: Nikita Gorbachevski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.2.3
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> In the current implementation of `ZkCoordinator` deleted partition managers 
> are used as state holders for newly created partition managers. This 
> behaviour was introduced in the scope of 
> [this|https://issues-test.apache.org/jira/browse/STORM-2296] ticket. However 
> existing lookup is based on only on partition number.
> {code:java}
> Map deletedManagers = new HashMap<>();
> for (Partition id : deletedPartitions) {
>  deletedManagers.put(id.partition, _managers.remove(id));
> }
> for (PartitionManager manager : deletedManagers.values()) {
>  if (manager != null) manager.close();
> }
> LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition 
> managers: " + newPartitions.toString());
> for (Partition id : newPartitions) {
>  PartitionManager man = new PartitionManager(
>  _connections,
>  _topologyInstanceId,
>  _state,
>  _topoConf,
>  _spoutConfig,
>  id,
>  deletedManagers.get(id.partition));
>  _managers.put(id, man);
> {code}
> Which is definitely incorrect as the same task is able to manage multiple 
> partitions with the same number but for different topics. In this case all 
> new partition managers obtain the same offset value from a random deleted 
> partition manager (as `HashMap` is used). And all fetch requests for the new 
> partition managers fail with `TopicOffsetOutOfRangeException`. Some of them 
> are recovered via this logic if assigned offset is smaller than the real one, 
> but other continue to repetitively fail with offset out of range exception 
> preventing fetching messages from Kafka.
> {code:java}
> if (offset > _emittedToOffset) {
>  _lostMessageCount.incrBy(offset - _emittedToOffset);
>  _emittedToOffset = offset;
>  LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
> }
> {code}
> I assume that state holder lookup should be based both on topic and partition 
> number.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)