[jira] [Commented] (STORM-378) SleepSpoutWaitStrategy.emptyEmit should use the variable streak
[ https://issues.apache.org/jira/browse/STORM-378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14270412#comment-14270412 ] ASF GitHub Bot commented on STORM-378: -- Github user caofangkun closed the pull request at: https://github.com/apache/storm/pull/295 SleepSpoutWaitStrategy.emptyEmit should use the variable streak -- Key: STORM-378 URL: https://issues.apache.org/jira/browse/STORM-378 Project: Apache Storm Issue Type: Bug Affects Versions: 0.9.2-incubating Reporter: caofangkun Priority: Minor {code:java} Index: src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java === --- src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java (revision 2868) +++ src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java (working copy) @@ -18,6 +18,8 @@ package backtype.storm.spout; import backtype.storm.Config; +import backtype.storm.utils.Utils; + import java.util.Map; @@ -27,13 +29,14 @@ @Override public void prepare(Map conf) { -sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue(); +sleepMillis = Utils.getLong( +conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS), 500); } @Override public void emptyEmit(long streak) { try { -Thread.sleep(sleepMillis); +Thread.sleep(Math.abs(sleepMillis + streak)); } catch (InterruptedException e) { throw new RuntimeException(e); } Index: src/jvm/backtype/storm/utils/Utils.java === --- src/jvm/backtype/storm/utils/Utils.java (revision 2888) +++ src/jvm/backtype/storm/utils/Utils.java (working copy) @@ -325,6 +325,24 @@ throw new IllegalArgumentException(Don't know how to convert + o + + to int); } } + +public static Long getLong(Object o, long defaultValue) { + + if (o == null) { +return defaultValue; + } + + if (o instanceof String) { +return Long.valueOf(String.valueOf(o)); + } else if (o instanceof Integer) { +Integer value = (Integer) o; +return Long.valueOf((Integer) value); + } else if (o instanceof Long) { +return (Long) o; + } else { +return defaultValue; + } +} public static boolean getBoolean(Object o, boolean defaultValue) { if (null == o) { {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-378) SleepSpoutWaitStrategy.emptyEmit should use the variable streak
[ https://issues.apache.org/jira/browse/STORM-378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198850#comment-14198850 ] ASF GitHub Bot commented on STORM-378: -- Github user ptgoetz commented on the pull request: https://github.com/apache/storm/pull/295#issuecomment-61860848 -1 I agree with @HeartSaVioR that if we want an implementation of `ISpoutWaitStrategy` that takes into account the `streak` parameter, it should be a separate implementation and `SleepSpoutWaitStrategy` should be left as-is. SleepSpoutWaitStrategy.emptyEmit should use the variable streak -- Key: STORM-378 URL: https://issues.apache.org/jira/browse/STORM-378 Project: Apache Storm Issue Type: Bug Affects Versions: 0.9.2-incubating Reporter: caofangkun Priority: Minor {code:java} Index: src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java === --- src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java (revision 2868) +++ src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java (working copy) @@ -18,6 +18,8 @@ package backtype.storm.spout; import backtype.storm.Config; +import backtype.storm.utils.Utils; + import java.util.Map; @@ -27,13 +29,14 @@ @Override public void prepare(Map conf) { -sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue(); +sleepMillis = Utils.getLong( +conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS), 500); } @Override public void emptyEmit(long streak) { try { -Thread.sleep(sleepMillis); +Thread.sleep(Math.abs(sleepMillis + streak)); } catch (InterruptedException e) { throw new RuntimeException(e); } Index: src/jvm/backtype/storm/utils/Utils.java === --- src/jvm/backtype/storm/utils/Utils.java (revision 2888) +++ src/jvm/backtype/storm/utils/Utils.java (working copy) @@ -325,6 +325,24 @@ throw new IllegalArgumentException(Don't know how to convert + o + + to int); } } + +public static Long getLong(Object o, long defaultValue) { + + if (o == null) { +return defaultValue; + } + + if (o instanceof String) { +return Long.valueOf(String.valueOf(o)); + } else if (o instanceof Integer) { +Integer value = (Integer) o; +return Long.valueOf((Integer) value); + } else if (o instanceof Long) { +return (Long) o; + } else { +return defaultValue; + } +} public static boolean getBoolean(Object o, boolean defaultValue) { if (null == o) { {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-378) SleepSpoutWaitStrategy.emptyEmit should use the variable streak
[ https://issues.apache.org/jira/browse/STORM-378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14197697#comment-14197697 ] ASF GitHub Bot commented on STORM-378: -- Github user HeartSaVioR commented on the pull request: https://github.com/apache/storm/pull/295#issuecomment-61767078 To make it clear, PR points to increase/decrease sleep time with streak. Fixed sleep time is configurable so it doesn't matter how long it is, optimal value should be vary for workload. I think we can make new ISpoutWaitStrategy implementation that play with streak if we really need it. We definitely need wait strategy to always sleep same time (ex. 1ms), so it isn't a good idea to change existing class's behavior. SleepSpoutWaitStrategy.emptyEmit should use the variable streak -- Key: STORM-378 URL: https://issues.apache.org/jira/browse/STORM-378 Project: Apache Storm Issue Type: Bug Affects Versions: 0.9.2-incubating Reporter: caofangkun Priority: Minor {code:java} Index: src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java === --- src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java (revision 2868) +++ src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java (working copy) @@ -18,6 +18,8 @@ package backtype.storm.spout; import backtype.storm.Config; +import backtype.storm.utils.Utils; + import java.util.Map; @@ -27,13 +29,14 @@ @Override public void prepare(Map conf) { -sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue(); +sleepMillis = Utils.getLong( +conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS), 500); } @Override public void emptyEmit(long streak) { try { -Thread.sleep(sleepMillis); +Thread.sleep(Math.abs(sleepMillis + streak)); } catch (InterruptedException e) { throw new RuntimeException(e); } Index: src/jvm/backtype/storm/utils/Utils.java === --- src/jvm/backtype/storm/utils/Utils.java (revision 2888) +++ src/jvm/backtype/storm/utils/Utils.java (working copy) @@ -325,6 +325,24 @@ throw new IllegalArgumentException(Don't know how to convert + o + + to int); } } + +public static Long getLong(Object o, long defaultValue) { + + if (o == null) { +return defaultValue; + } + + if (o instanceof String) { +return Long.valueOf(String.valueOf(o)); + } else if (o instanceof Integer) { +Integer value = (Integer) o; +return Long.valueOf((Integer) value); + } else if (o instanceof Long) { +return (Long) o; + } else { +return defaultValue; + } +} public static boolean getBoolean(Object o, boolean defaultValue) { if (null == o) { {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-378) SleepSpoutWaitStrategy.emptyEmit should use the variable streak
[ https://issues.apache.org/jira/browse/STORM-378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14183698#comment-14183698 ] ASF GitHub Bot commented on STORM-378: -- Github user HeartSaVioR commented on the pull request: https://github.com/apache/storm/pull/295#issuecomment-60460916 @caofangkun It's mk-threads in executor.clj. ``` (if (and (= curr-count (.get emitted-count)) active?) (do (.increment empty-emit-streak) (.emptyEmit spout-wait-strategy (.get empty-emit-streak))) (.set empty-emit-streak 0) )) ``` You can find that streak gets increased by 1, so I think it's for alternative implementation of ISpoutWaitStrategy, not SleepSpoutWaitStrategy. (@nathanmarz Could you please confirm it?) If it is for, just adding it to sleepMillis barely affects sleep time. Streak should be multiplied by 10 or something bigger, maybe we can apply exponential value of already multiplied streak. SleepSpoutWaitStrategy.emptyEmit should use the variable streak -- Key: STORM-378 URL: https://issues.apache.org/jira/browse/STORM-378 Project: Apache Storm Issue Type: Bug Affects Versions: 0.9.2-incubating Reporter: caofangkun Priority: Minor {code:java} Index: src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java === --- src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java (revision 2868) +++ src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java (working copy) @@ -18,6 +18,8 @@ package backtype.storm.spout; import backtype.storm.Config; +import backtype.storm.utils.Utils; + import java.util.Map; @@ -27,13 +29,14 @@ @Override public void prepare(Map conf) { -sleepMillis = ((Number) conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue(); +sleepMillis = Utils.getLong( +conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS), 500); } @Override public void emptyEmit(long streak) { try { -Thread.sleep(sleepMillis); +Thread.sleep(Math.abs(sleepMillis + streak)); } catch (InterruptedException e) { throw new RuntimeException(e); } Index: src/jvm/backtype/storm/utils/Utils.java === --- src/jvm/backtype/storm/utils/Utils.java (revision 2888) +++ src/jvm/backtype/storm/utils/Utils.java (working copy) @@ -325,6 +325,24 @@ throw new IllegalArgumentException(Don't know how to convert + o + + to int); } } + +public static Long getLong(Object o, long defaultValue) { + + if (o == null) { +return defaultValue; + } + + if (o instanceof String) { +return Long.valueOf(String.valueOf(o)); + } else if (o instanceof Integer) { +Integer value = (Integer) o; +return Long.valueOf((Integer) value); + } else if (o instanceof Long) { +return (Long) o; + } else { +return defaultValue; + } +} public static boolean getBoolean(Object o, boolean defaultValue) { if (null == o) { {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)