[jira] [Commented] (STORM-378) SleepSpoutWaitStrategy.emptyEmit should use the variable streak

2015-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14270412#comment-14270412
 ] 

ASF GitHub Bot commented on STORM-378:
--

Github user caofangkun closed the pull request at:

https://github.com/apache/storm/pull/295


 SleepSpoutWaitStrategy.emptyEmit should use  the variable streak
 --

 Key: STORM-378
 URL: https://issues.apache.org/jira/browse/STORM-378
 Project: Apache Storm
  Issue Type: Bug
Affects Versions: 0.9.2-incubating
Reporter: caofangkun
Priority: Minor

 {code:java}
 Index: src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java
 ===
 --- src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java  (revision 2868)
 +++ src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java  (working copy)
 @@ -18,6 +18,8 @@
  package backtype.storm.spout;
  
  import backtype.storm.Config;
 +import backtype.storm.utils.Utils;
 +
  import java.util.Map;
  
  
 @@ -27,13 +29,14 @@
  
  @Override
  public void prepare(Map conf) {
 -sleepMillis = ((Number) 
 conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue();
 +sleepMillis = Utils.getLong(
 +conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS), 
 500);
  }
  
  @Override
  public void emptyEmit(long streak) {
  try {
 -Thread.sleep(sleepMillis);
 +Thread.sleep(Math.abs(sleepMillis + streak));
  } catch (InterruptedException e) {
  throw new RuntimeException(e);
  }
 Index: src/jvm/backtype/storm/utils/Utils.java
 ===
 --- src/jvm/backtype/storm/utils/Utils.java   (revision 2888)
 +++ src/jvm/backtype/storm/utils/Utils.java   (working copy)
 @@ -325,6 +325,24 @@
throw new IllegalArgumentException(Don't know how to convert  + 
 o +  + to int);
}
  }
 +
 +public static Long getLong(Object o, long defaultValue) {
 +
 +  if (o == null) {
 +return defaultValue;
 +  }
 +
 +  if (o instanceof String) {
 +return Long.valueOf(String.valueOf(o));
 +  } else if (o instanceof Integer) {
 +Integer value = (Integer) o;
 +return Long.valueOf((Integer) value);
 +  } else if (o instanceof Long) {
 +return (Long) o;
 +  } else {
 +return defaultValue;
 +  }
 +}
  
  public static boolean getBoolean(Object o, boolean defaultValue) {
if (null == o) {
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-378) SleepSpoutWaitStrategy.emptyEmit should use the variable streak

2014-11-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14198850#comment-14198850
 ] 

ASF GitHub Bot commented on STORM-378:
--

Github user ptgoetz commented on the pull request:

https://github.com/apache/storm/pull/295#issuecomment-61860848
  
-1

I agree with @HeartSaVioR that if we want an implementation of 
`ISpoutWaitStrategy` that takes into account the `streak` parameter, it should 
be a separate implementation and `SleepSpoutWaitStrategy` should be left as-is.


 SleepSpoutWaitStrategy.emptyEmit should use  the variable streak
 --

 Key: STORM-378
 URL: https://issues.apache.org/jira/browse/STORM-378
 Project: Apache Storm
  Issue Type: Bug
Affects Versions: 0.9.2-incubating
Reporter: caofangkun
Priority: Minor

 {code:java}
 Index: src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java
 ===
 --- src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java  (revision 2868)
 +++ src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java  (working copy)
 @@ -18,6 +18,8 @@
  package backtype.storm.spout;
  
  import backtype.storm.Config;
 +import backtype.storm.utils.Utils;
 +
  import java.util.Map;
  
  
 @@ -27,13 +29,14 @@
  
  @Override
  public void prepare(Map conf) {
 -sleepMillis = ((Number) 
 conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue();
 +sleepMillis = Utils.getLong(
 +conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS), 
 500);
  }
  
  @Override
  public void emptyEmit(long streak) {
  try {
 -Thread.sleep(sleepMillis);
 +Thread.sleep(Math.abs(sleepMillis + streak));
  } catch (InterruptedException e) {
  throw new RuntimeException(e);
  }
 Index: src/jvm/backtype/storm/utils/Utils.java
 ===
 --- src/jvm/backtype/storm/utils/Utils.java   (revision 2888)
 +++ src/jvm/backtype/storm/utils/Utils.java   (working copy)
 @@ -325,6 +325,24 @@
throw new IllegalArgumentException(Don't know how to convert  + 
 o +  + to int);
}
  }
 +
 +public static Long getLong(Object o, long defaultValue) {
 +
 +  if (o == null) {
 +return defaultValue;
 +  }
 +
 +  if (o instanceof String) {
 +return Long.valueOf(String.valueOf(o));
 +  } else if (o instanceof Integer) {
 +Integer value = (Integer) o;
 +return Long.valueOf((Integer) value);
 +  } else if (o instanceof Long) {
 +return (Long) o;
 +  } else {
 +return defaultValue;
 +  }
 +}
  
  public static boolean getBoolean(Object o, boolean defaultValue) {
if (null == o) {
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-378) SleepSpoutWaitStrategy.emptyEmit should use the variable streak

2014-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14197697#comment-14197697
 ] 

ASF GitHub Bot commented on STORM-378:
--

Github user HeartSaVioR commented on the pull request:

https://github.com/apache/storm/pull/295#issuecomment-61767078
  
To make it clear, PR points to increase/decrease sleep time with streak.
Fixed sleep time is configurable so it doesn't matter how long it is, 
optimal value should be vary for workload.

I think we can make new ISpoutWaitStrategy implementation that play with 
streak if we really need it.
We definitely need wait strategy to always sleep same time (ex. 1ms), so it 
isn't a good idea to change existing class's behavior.


 SleepSpoutWaitStrategy.emptyEmit should use  the variable streak
 --

 Key: STORM-378
 URL: https://issues.apache.org/jira/browse/STORM-378
 Project: Apache Storm
  Issue Type: Bug
Affects Versions: 0.9.2-incubating
Reporter: caofangkun
Priority: Minor

 {code:java}
 Index: src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java
 ===
 --- src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java  (revision 2868)
 +++ src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java  (working copy)
 @@ -18,6 +18,8 @@
  package backtype.storm.spout;
  
  import backtype.storm.Config;
 +import backtype.storm.utils.Utils;
 +
  import java.util.Map;
  
  
 @@ -27,13 +29,14 @@
  
  @Override
  public void prepare(Map conf) {
 -sleepMillis = ((Number) 
 conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue();
 +sleepMillis = Utils.getLong(
 +conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS), 
 500);
  }
  
  @Override
  public void emptyEmit(long streak) {
  try {
 -Thread.sleep(sleepMillis);
 +Thread.sleep(Math.abs(sleepMillis + streak));
  } catch (InterruptedException e) {
  throw new RuntimeException(e);
  }
 Index: src/jvm/backtype/storm/utils/Utils.java
 ===
 --- src/jvm/backtype/storm/utils/Utils.java   (revision 2888)
 +++ src/jvm/backtype/storm/utils/Utils.java   (working copy)
 @@ -325,6 +325,24 @@
throw new IllegalArgumentException(Don't know how to convert  + 
 o +  + to int);
}
  }
 +
 +public static Long getLong(Object o, long defaultValue) {
 +
 +  if (o == null) {
 +return defaultValue;
 +  }
 +
 +  if (o instanceof String) {
 +return Long.valueOf(String.valueOf(o));
 +  } else if (o instanceof Integer) {
 +Integer value = (Integer) o;
 +return Long.valueOf((Integer) value);
 +  } else if (o instanceof Long) {
 +return (Long) o;
 +  } else {
 +return defaultValue;
 +  }
 +}
  
  public static boolean getBoolean(Object o, boolean defaultValue) {
if (null == o) {
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-378) SleepSpoutWaitStrategy.emptyEmit should use the variable streak

2014-10-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14183698#comment-14183698
 ] 

ASF GitHub Bot commented on STORM-378:
--

Github user HeartSaVioR commented on the pull request:

https://github.com/apache/storm/pull/295#issuecomment-60460916
  
@caofangkun 
It's mk-threads in executor.clj.

```
(if (and (= curr-count (.get emitted-count)) active?)
  (do (.increment empty-emit-streak)
  (.emptyEmit spout-wait-strategy (.get empty-emit-streak)))
  (.set empty-emit-streak 0)
  ))   
```

You can find that streak gets increased by 1, so I think it's for 
alternative implementation of ISpoutWaitStrategy, not SleepSpoutWaitStrategy.
(@nathanmarz Could you please confirm it?)
If it is for, just adding it to sleepMillis barely affects sleep time.
Streak should be multiplied by 10 or something bigger, maybe we can apply 
exponential value of already multiplied streak.


 SleepSpoutWaitStrategy.emptyEmit should use  the variable streak
 --

 Key: STORM-378
 URL: https://issues.apache.org/jira/browse/STORM-378
 Project: Apache Storm
  Issue Type: Bug
Affects Versions: 0.9.2-incubating
Reporter: caofangkun
Priority: Minor

 {code:java}
 Index: src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java
 ===
 --- src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java  (revision 2868)
 +++ src/jvm/backtype/storm/spout/SleepSpoutWaitStrategy.java  (working copy)
 @@ -18,6 +18,8 @@
  package backtype.storm.spout;
  
  import backtype.storm.Config;
 +import backtype.storm.utils.Utils;
 +
  import java.util.Map;
  
  
 @@ -27,13 +29,14 @@
  
  @Override
  public void prepare(Map conf) {
 -sleepMillis = ((Number) 
 conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS)).longValue();
 +sleepMillis = Utils.getLong(
 +conf.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS), 
 500);
  }
  
  @Override
  public void emptyEmit(long streak) {
  try {
 -Thread.sleep(sleepMillis);
 +Thread.sleep(Math.abs(sleepMillis + streak));
  } catch (InterruptedException e) {
  throw new RuntimeException(e);
  }
 Index: src/jvm/backtype/storm/utils/Utils.java
 ===
 --- src/jvm/backtype/storm/utils/Utils.java   (revision 2888)
 +++ src/jvm/backtype/storm/utils/Utils.java   (working copy)
 @@ -325,6 +325,24 @@
throw new IllegalArgumentException(Don't know how to convert  + 
 o +  + to int);
}
  }
 +
 +public static Long getLong(Object o, long defaultValue) {
 +
 +  if (o == null) {
 +return defaultValue;
 +  }
 +
 +  if (o instanceof String) {
 +return Long.valueOf(String.valueOf(o));
 +  } else if (o instanceof Integer) {
 +Integer value = (Integer) o;
 +return Long.valueOf((Integer) value);
 +  } else if (o instanceof Long) {
 +return (Long) o;
 +  } else {
 +return defaultValue;
 +  }
 +}
  
  public static boolean getBoolean(Object o, boolean defaultValue) {
if (null == o) {
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)