[jira] [Updated] (STORM-2947) Review and fix/remove deprecated things in Storm 2.0.0

2018-07-09 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated STORM-2947:


This shouldn't be a blocker, whether it might be nice to address...

> Review and fix/remove deprecated things in Storm 2.0.0
> --
>
> Key: STORM-2947
> URL: https://issues.apache.org/jira/browse/STORM-2947
> Project: Apache Storm
>  Issue Type: Task
>  Components: storm-client, storm-hdfs, storm-kafka, storm-server, 
> storm-solr
>Affects Versions: 2.0.0
>Reporter: Jungtaek Lim
>Assignee: Jungtaek Lim
>Priority: Major
>
> We've been deprecating the things but haven't have time to replace/get rid of 
> them. It should be better if we have time to review and address them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-3100) Minor optimization: Replace HashMap with an array backed data structure for faster lookups

2018-07-09 Thread Roshan Naik (JIRA)


[ 
https://issues.apache.org/jira/browse/STORM-3100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16538051#comment-16538051
 ] 

Roshan Naik commented on STORM-3100:


fine with me.

> Minor optimization: Replace HashMap with an array backed data 
> structure for faster lookups
> --
>
> Key: STORM-3100
> URL: https://issues.apache.org/jira/browse/STORM-3100
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Roshan Naik
>Assignee: Roshan Naik
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> * Introduce _CustomIndexArray_: An array backed data structure to speedup 
> HashMap use cases *in critical path*. It needs to supported -ve 
> indexing and a user defined (on construction) Upper and Lower Index range. 
> Does not need to be dynamically resizable given the nature of use cases we 
> have.
>  * Use this data structure for _GeneralTopologyContext._taskToComponent_ 
> mapping which is looked up in the critical path _Task.getOutgoingTasks._ This 
> lookup happens at least once for every emit and consequently can happen 
> millions of times per second.
>  * Also use this for _JCQueue.localReceiveQueues_ where the basic idea is 
> already in use but not in a reusable manner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-2963) Updates to Performance.md

2018-07-09 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-2963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated STORM-2963:


I wouldn't think this is a blocker for Storm 2.0.0, since Performance.md 
catches up all performance patches, and there's no further performance patch 
(except minor) after STORM-2958. Will remove epic link.

> Updates to Performance.md 
> --
>
> Key: STORM-2963
> URL: https://issues.apache.org/jira/browse/STORM-2963
> Project: Apache Storm
>  Issue Type: Documentation
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Assignee: Roshan Naik
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-3100) Minor optimization: Replace HashMap with an array backed data structure for faster lookups

2018-07-09 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/STORM-3100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16537969#comment-16537969
 ] 

Jungtaek Lim commented on STORM-3100:
-

I would remove epic link for this, since this is rather improvement, not 
blocker for Storm 2.0.0. There's still a chance for this to be included in 
Storm 2.0.0 but shouldn't explicitly mark as a blocker.

> Minor optimization: Replace HashMap with an array backed data 
> structure for faster lookups
> --
>
> Key: STORM-3100
> URL: https://issues.apache.org/jira/browse/STORM-3100
> Project: Apache Storm
>  Issue Type: Improvement
>Reporter: Roshan Naik
>Assignee: Roshan Naik
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> * Introduce _CustomIndexArray_: An array backed data structure to speedup 
> HashMap use cases *in critical path*. It needs to supported -ve 
> indexing and a user defined (on construction) Upper and Lower Index range. 
> Does not need to be dynamically resizable given the nature of use cases we 
> have.
>  * Use this data structure for _GeneralTopologyContext._taskToComponent_ 
> mapping which is looked up in the critical path _Task.getOutgoingTasks._ This 
> lookup happens at least once for every emit and consequently can happen 
> millions of times per second.
>  * Also use this for _JCQueue.localReceiveQueues_ where the basic idea is 
> already in use but not in a reusable manner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-2945) Nail down and document how to support background emits in Spouts and Bolts

2018-07-09 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-2945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated STORM-2945:


Removing epic link since I don't think this is a blocker for Storm 2.0.0.

> Nail down and document how to support background emits in Spouts and Bolts
> --
>
> Key: STORM-2945
> URL: https://issues.apache.org/jira/browse/STORM-2945
> Project: Apache Storm
>  Issue Type: Documentation
>Affects Versions: 2.0.0
>Reporter: Roshan Naik
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-3056) Add a test for quickly rebinding to a port

2018-07-09 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-3056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved STORM-3056.
-
   Resolution: Fixed
Fix Version/s: 2.0.0

Thanks [~raghavgautam], I merged into master.

It doesn't seem to cleanly cherry picked into 1.x-branch as well as 
1.1.x-branch. It would be nice if you could submit a patch for 1.x-branch but 
only applying to master branch would be OK.

> Add a test for quickly rebinding to a port
> --
>
> Key: STORM-3056
> URL: https://issues.apache.org/jira/browse/STORM-3056
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Affects Versions: 1.1.2, 1.2.1
>Reporter: Raghav Kumar Gautam
>Assignee: Raghav Kumar Gautam
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> We need to add a test for the bug fix of STORM-3039. We try to rebind to port 
> 6700 a few times and expect it to be usable quickly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-3136) Fix flaky integration test, and make it more readable

2018-07-09 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-3136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved STORM-3136.
-
   Resolution: Fixed
Fix Version/s: 2.0.0

Thanks [~Srdo], I merged into master.

> Fix flaky integration test, and make it more readable
> -
>
> Key: STORM-3136
> URL: https://issues.apache.org/jira/browse/STORM-3136
> Project: Apache Storm
>  Issue Type: Bug
>  Components: integration-test
>Affects Versions: 2.0.0
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The integration test appears flaky, e.g. 
> https://travis-ci.org/apache/storm/jobs/397471420.
> We should try to fix this. I also find the integration test code hard to read 
> and understand, so I'd like to do some cleanup.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-3143) Unnecessary inclusion of empty match result in Json

2018-07-09 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-3143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved STORM-3143.
-
Resolution: Fixed

Thanks [~zhengdai], I merged into master.

> Unnecessary inclusion of empty match result in Json
> ---
>
> Key: STORM-3143
> URL: https://issues.apache.org/jira/browse/STORM-3143
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-webapp
>Affects Versions: 2.0.0
>Reporter: Zhengdai Hu
>Assignee: Zhengdai Hu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> `FindNMatches()` didn't correctly filter out empty match result in 
> `substringSearch()` and hence send back an empty map to user. I don't know if 
> this the desired behavior but a fix to current behavior will make metrics for 
> logviewer easier to implement. 
> An example of current behavior:
> {code:json}
> {
> "fileOffset": 1,
> "searchString": "sdf",
> "matches": [
> {
> "searchString": "sdf",
> "fileName": "word-count-1-1530815972/6701/worker.log",
> "matches": [],
> "port": "6701",
> "isDaemon": "no",
> "startByteOffset": 0
> }
> ]
> }
> {code}
> Desired behavior:
> {code:json}
> {
> "fileOffset": 1,
> "searchString": "sdf",
> "matches": []
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-3082) NamedTopicFilter can't handle topics that don't exist yet

2018-07-09 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/STORM-3082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing updated STORM-3082:
--
Affects Version/s: 1.1.3

> NamedTopicFilter can't handle topics that don't exist yet
> -
>
> Key: STORM-3082
> URL: https://issues.apache.org/jira/browse/STORM-3082
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.2.1, 1.1.3
>Reporter: Stig Rohde Døssing
>Assignee: Aniket Alhat
>Priority: Minor
>  Labels: newbie, pull-request-available
> Fix For: 2.0.0, 1.2.3, 1.1.4
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> [~aniket.alhat] reported on the mailing list that he got an NPE when trying 
> to start the Trident spout.
> {code}
> 2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died!
> java.lang.NullPointerException: null
> at 
> org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTopicPartitions(NamedTopicFilter.java:57)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.ManualPartitionSubscription.refreshAssignment(ManualPartitionSubscription.java:54)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.ManualPartitionSubscription.subscribe(ManualPartitionSubscription.java:49)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.(KafkaTridentSpoutEmitter.java:84)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.(KafkaTridentSpoutEmitter.java:100)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque.getEmitter(KafkaTridentSpoutOpaque.java:50)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.(OpaquePartitionedTridentSpoutExecutor.java:97)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:245)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) 
> [storm-core-1.2.1.jar:1.2.1]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
> {code}
> It looks to me like the partitionsFor method on the consumer will return null 
> if the specified topic doesn't exist. We didn't account for this in the 
> filter, because the return type of the method is a List, and we assumed it 
> wouldn't be null.
> I think it's reasonable that people should be able to subscribe to topics 
> that don't exist yet, and the spout should pick up the new topics eventually.
> We should check for null here 
> https://github.com/apache/storm/blob/93ed601425a79759c0189a945c6b46266e5c9ced/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java#L55,
>  and maybe log a warning if the returned value is null.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-3082) NamedTopicFilter can't handle topics that don't exist yet

2018-07-09 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/STORM-3082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing resolved STORM-3082.
---
   Resolution: Fixed
Fix Version/s: 1.1.4
   1.2.3
   2.0.0

Thanks [~aniket.alhat]. Merged via 2e3f767353be09bf08b2c74738523acfac4a9491 
(master), 4e1e62667d97fe684b1d73cd8a1c41d3175cf6bc (1.x) and 
487b81d0773441c8e8ea0542b973616ad93a3f36 (1.1.x)

> NamedTopicFilter can't handle topics that don't exist yet
> -
>
> Key: STORM-3082
> URL: https://issues.apache.org/jira/browse/STORM-3082
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client
>Affects Versions: 2.0.0, 1.2.1, 1.1.3
>Reporter: Stig Rohde Døssing
>Assignee: Aniket Alhat
>Priority: Minor
>  Labels: newbie, pull-request-available
> Fix For: 2.0.0, 1.2.3, 1.1.4
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> [~aniket.alhat] reported on the mailing list that he got an NPE when trying 
> to start the Trident spout.
> {code}
> 2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died!
> java.lang.NullPointerException: null
> at 
> org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTopicPartitions(NamedTopicFilter.java:57)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.ManualPartitionSubscription.refreshAssignment(ManualPartitionSubscription.java:54)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.ManualPartitionSubscription.subscribe(ManualPartitionSubscription.java:49)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.(KafkaTridentSpoutEmitter.java:84)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.(KafkaTridentSpoutEmitter.java:100)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque.getEmitter(KafkaTridentSpoutOpaque.java:50)
>  ~[stormjar.jar:?]
> at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.(OpaquePartitionedTridentSpoutExecutor.java:97)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:245)
>  ~[storm-core-1.2.1.jar:1.2.1]
> at 
> org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803) 
> ~[storm-core-1.2.1.jar:1.2.1]
> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) 
> [storm-core-1.2.1.jar:1.2.1]
> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
> {code}
> It looks to me like the partitionsFor method on the consumer will return null 
> if the specified topic doesn't exist. We didn't account for this in the 
> filter, because the return type of the method is a List, and we assumed it 
> wouldn't be null.
> I think it's reasonable that people should be able to subscribe to topics 
> that don't exist yet, and the spout should pick up the new topics eventually.
> We should check for null here 
> https://github.com/apache/storm/blob/93ed601425a79759c0189a945c6b46266e5c9ced/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java#L55,
>  and maybe log a warning if the returned value is null.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-3028) HdfsSpout does not handle empty files in case of ack enabled

2018-07-09 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-3028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved STORM-3028.
-
   Resolution: Fixed
Fix Version/s: 1.2.3
   2.0.0

Thanks [~ghajos], I merged into master and 1.x-branch.

> HdfsSpout does not handle empty files in case of ack enabled
> 
>
> Key: STORM-3028
> URL: https://issues.apache.org/jira/browse/STORM-3028
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-hdfs
>Affects Versions: 1.2.1
>Reporter: Gergely Hajós
>Assignee: Gergely Hajós
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.2.3
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> When ackers are present and an empty file is read, HdfsSpout waits for 
> acknowledgement to close the file. As nothing was emitted no ack arrives so 
> HdfsSpout stays in "fileReadCompletely" state forever.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-3090) The same offset value is used by the same partition number of different topics.

2018-07-09 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/STORM-3090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing resolved STORM-3090.
---
   Resolution: Fixed
Fix Version/s: 1.1.4

> The same offset value is used by the same partition number of different 
> topics.
> ---
>
> Key: STORM-3090
> URL: https://issues.apache.org/jira/browse/STORM-3090
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 1.1.0, 1.0.4, 1.2.2
>Reporter: Nikita Gorbachevski
>Assignee: Nikita Gorbachevski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.2.3, 1.1.4
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> In the current implementation of `ZkCoordinator` deleted partition managers 
> are used as state holders for newly created partition managers. This 
> behaviour was introduced in the scope of 
> [this|https://issues-test.apache.org/jira/browse/STORM-2296] ticket. However 
> existing lookup is based on only on partition number.
> {code:java}
> Map deletedManagers = new HashMap<>();
> for (Partition id : deletedPartitions) {
>  deletedManagers.put(id.partition, _managers.remove(id));
> }
> for (PartitionManager manager : deletedManagers.values()) {
>  if (manager != null) manager.close();
> }
> LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition 
> managers: " + newPartitions.toString());
> for (Partition id : newPartitions) {
>  PartitionManager man = new PartitionManager(
>  _connections,
>  _topologyInstanceId,
>  _state,
>  _topoConf,
>  _spoutConfig,
>  id,
>  deletedManagers.get(id.partition));
>  _managers.put(id, man);
> {code}
> Which is definitely incorrect as the same task is able to manage multiple 
> partitions with the same number but for different topics. In this case all 
> new partition managers obtain the same offset value from a random deleted 
> partition manager (as `HashMap` is used). And all fetch requests for the new 
> partition managers fail with `TopicOffsetOutOfRangeException`. Some of them 
> are recovered via this logic if assigned offset is smaller than the real one, 
> but other continue to repetitively fail with offset out of range exception 
> preventing fetching messages from Kafka.
> {code:java}
> if (offset > _emittedToOffset) {
>  _lostMessageCount.incrBy(offset - _emittedToOffset);
>  _emittedToOffset = offset;
>  LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
> }
> {code}
> I assume that state holder lookup should be based both on topic and partition 
> number.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (STORM-3090) The same offset value is used by the same partition number of different topics.

2018-07-09 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/STORM-3090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing reopened STORM-3090:
---

> The same offset value is used by the same partition number of different 
> topics.
> ---
>
> Key: STORM-3090
> URL: https://issues.apache.org/jira/browse/STORM-3090
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 1.1.0, 1.0.4, 1.2.2
>Reporter: Nikita Gorbachevski
>Assignee: Nikita Gorbachevski
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.2.3, 1.1.4
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> In the current implementation of `ZkCoordinator` deleted partition managers 
> are used as state holders for newly created partition managers. This 
> behaviour was introduced in the scope of 
> [this|https://issues-test.apache.org/jira/browse/STORM-2296] ticket. However 
> existing lookup is based on only on partition number.
> {code:java}
> Map deletedManagers = new HashMap<>();
> for (Partition id : deletedPartitions) {
>  deletedManagers.put(id.partition, _managers.remove(id));
> }
> for (PartitionManager manager : deletedManagers.values()) {
>  if (manager != null) manager.close();
> }
> LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition 
> managers: " + newPartitions.toString());
> for (Partition id : newPartitions) {
>  PartitionManager man = new PartitionManager(
>  _connections,
>  _topologyInstanceId,
>  _state,
>  _topoConf,
>  _spoutConfig,
>  id,
>  deletedManagers.get(id.partition));
>  _managers.put(id, man);
> {code}
> Which is definitely incorrect as the same task is able to manage multiple 
> partitions with the same number but for different topics. In this case all 
> new partition managers obtain the same offset value from a random deleted 
> partition manager (as `HashMap` is used). And all fetch requests for the new 
> partition managers fail with `TopicOffsetOutOfRangeException`. Some of them 
> are recovered via this logic if assigned offset is smaller than the real one, 
> but other continue to repetitively fail with offset out of range exception 
> preventing fetching messages from Kafka.
> {code:java}
> if (offset > _emittedToOffset) {
>  _lostMessageCount.incrBy(offset - _emittedToOffset);
>  _emittedToOffset = offset;
>  LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
> }
> {code}
> I assume that state holder lookup should be based both on topic and partition 
> number.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-3127) Avoid potential race condition

2018-07-09 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved STORM-3127.
-
Resolution: Fixed

Thanks [~zhengdai], I merged into master.

> Avoid potential race condition 
> ---
>
> Key: STORM-3127
> URL: https://issues.apache.org/jira/browse/STORM-3127
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-server
>Affects Versions: 2.0.0
>Reporter: Zhengdai Hu
>Assignee: Zhengdai Hu
>Priority: Minor
> Fix For: 2.0.0
>
>
> PortAndAssignment and its call back is added after update to a blob is 
> invoked asynchronously. It is not guaranteed that the new dependent worker 
> will be registered before blob informs its update to listening workers. 
> This can be fixed by moving addReference call up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-3126) Avoid unnecessary force kill when invoking storm kill_workers

2018-07-09 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-3126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved STORM-3126.
-
Resolution: Fixed

Thanks [~zhengdai], I merged into master.

> Avoid unnecessary force kill when invoking storm kill_workers
> -
>
> Key: STORM-3126
> URL: https://issues.apache.org/jira/browse/STORM-3126
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-server
>Affects Versions: 2.0.0
>Reporter: Zhengdai Hu
>Assignee: Zhengdai Hu
>Priority: Minor
> Fix For: 2.0.0
>
>
> Supervisor tries to force kill a worker before checking if it has died, 
> leading to unnecessary force kill call. This is minor but does help clean up 
> logs a little bit. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-3125) Refactoring methods in components for Supervisor and DRPC

2018-07-09 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-3125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved STORM-3125.
-
Resolution: Fixed

Thanks [~zhengdai], I merged into master.

> Refactoring methods in components for Supervisor and DRPC
> -
>
> Key: STORM-3125
> URL: https://issues.apache.org/jira/browse/STORM-3125
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-server
>Affects Versions: 2.0.0
>Reporter: Zhengdai Hu
>Assignee: Zhengdai Hu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> This is a supplement issue page to STORM-3099, separating out refactoring 
> work from metrics addition.
> A few misc bug discovered during refactoring have been incorporate in this 
> issue as well. See links for more information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-3143) Unnecessary inclusion of empty match result in Json

2018-07-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-3143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated STORM-3143:
--
Labels: pull-request-available  (was: )

> Unnecessary inclusion of empty match result in Json
> ---
>
> Key: STORM-3143
> URL: https://issues.apache.org/jira/browse/STORM-3143
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-webapp
>Affects Versions: 2.0.0
>Reporter: Zhengdai Hu
>Assignee: Zhengdai Hu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> `FindNMatches()` didn't correctly filter out empty match result in 
> `substringSearch()` and hence send back an empty map to user. I don't know if 
> this the desired behavior but a fix to current behavior will make metrics for 
> logviewer easier to implement. 
> An example of current behavior:
> {code:json}
> {
> "fileOffset": 1,
> "searchString": "sdf",
> "matches": [
> {
> "searchString": "sdf",
> "fileName": "word-count-1-1530815972/6701/worker.log",
> "matches": [],
> "port": "6701",
> "isDaemon": "no",
> "startByteOffset": 0
> }
> ]
> }
> {code}
> Desired behavior:
> {code:json}
> {
> "fileOffset": 1,
> "searchString": "sdf",
> "matches": []
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-07-09 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved STORM-3046.
-
   Resolution: Fixed
Fix Version/s: 2.0.0

Thanks [~Srdo], I merged into master.

The patch wasn't applied to 1.x-branch cleanly. Could you please raise a PR for 
1.x-branch as well? Thanks in advance!

> Getting a NPE leading worker to die when starting a topology.
> -
>
> Key: STORM-3046
> URL: https://issues.apache.org/jira/browse/STORM-3046
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client, trident
>Affects Versions: 1.2.1
>Reporter: Kush Khandelwal
>Assignee: Stig Rohde Døssing
>Priority: Blocker
>  Labels: kafka, pull-request-available, storm-kafka-client, 
> trident
> Fix For: 2.0.0
>
> Attachments: TestTopology.java
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
> version 1.1.0.
> We have an external kafka from where we get the messages.
>  Whenever I try to run the topology, I get a NPE, which leads to the worker 
> getting died.
> If I set poll strategy to earliest and the topic already contains some 
> messages, it works fine.
>  I have used a custom record translator which is working fine.
>  Can someone please help me fix the issue?
> Thanks.
>  
> Error - 
> 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
> o.a.s.util - Async loop died!
>  java.lang.RuntimeException: java.lang.NullPointerException
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
>  Caused by: java.lang.NullPointerException
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  ... 6 more
>  
>  
> Topology class - 
>  
>  
>  
>  
> import org.apache.storm.Config;
> import org.apache.storm.LocalCluster;
> import org.apache.storm.StormSubmitter;
> import org.apache.storm.generated.*;
> import org.apache.storm.kafka.spout.KafkaSpoutConfig;
> import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
> import org.apache.storm.trident.Stream;
> import org.apache.storm.trident.TridentState;
> import org.apache.storm.trident.TridentTopology;
> import org.apache.storm.tuple.Fields;
> import java.util.Properties;
>  
> public class TestTopology {
>  
> private static StormTopology buildTopology(Properties stormProperties) {
>  
> Properties kafkaProperties = getProperties("/kafka.properties");
>  TridentTopology topology = new TridentTopology();
> Fields stageArguments = new Fields("test", "issue");
> KafkaSpoutConfig kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> "test")
>  

[jira] [Resolved] (STORM-3047) Ensure Trident emitter refreshPartitions is only called with partitions assigned to the emitter

2018-07-09 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/STORM-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved STORM-3047.
-
   Resolution: Fixed
Fix Version/s: 1.1.4
   1.2.3

Thanks [~Srdo], I merged into 1.x-branch and 1.1.x-branch respectively.

> Ensure Trident emitter refreshPartitions is only called with partitions 
> assigned to the emitter
> ---
>
> Key: STORM-3047
> URL: https://issues.apache.org/jira/browse/STORM-3047
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Affects Versions: 1.1.2, 1.2.1
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.2.3, 1.1.4
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This is a backport of the changes made to 
> OpaquePartitionedTridentSpoutExecutor in 
> https://github.com/apache/storm/pull/2300/files.
> The description of the issue is copied here for convenience:
> The changes in https://github.com/apache/storm/pull/2009 released in 1.1.0 
> made some changes to the OpaquePartitionedTridentSpoutExecutor that likely 
> broke IOpaquePartitionedTridentSpout implementations other than 
> storm-kafka-client. The changed code used to request sorted partitions from 
> the spout via getOrderedPartitions, do a round-robin partitioning, and assign 
> partitions via refreshPartitions 
> https://github.com/apache/storm/blob/v1.0.4/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L100.
>  The new code just passes the output of getOrderedPartitions into 
> refreshPartitions 
> https://github.com/apache/storm/blob/v1.1.0/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L120.
>  It looks to me like refreshPartitions is passed the list of all partitions 
> assigned to any spout task, rather than just the partitions assigned to the 
> current task.
> The proposed fix will use getOrderedPartitions to get the sorted partitions 
> list, pass the list into getPartitionsForTask, and pass the resulting list of 
> assigned partitions back into refreshPartitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)