[GitHub] storm pull request #2199: [STORM-2201] Add dynamic scheduler configuration l...

2017-08-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2199#discussion_r135337335
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java 
---
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+public interface IConfigLoader {
+
+/**
+ * Load scheduler configs
+ * It first retrieves parameters like the location of the scheduler 
config file from the conf;
+ * and then get the actual scheduler configs associated with the 
configKey from that location.
+ * @param configKey The key from which we want to get the scheduler 
config.
+ * @return The scheduler configs
+ */
+Map load(String configKey);
+
+String SCHEDULER_CONFIG_LOADER_URI = "scheduler.config.loader.uri";
+String SCHEDULER_CONFIG_LOADER_POLLTIME_SECS = 
"scheduler.config.loader.polltime.secs";
+String SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS = 
"scheduler.config.loader.timeout.secs";
+
+int DEFAULT_POLLTIME_SECS = 600;
+int DEFAULT_TIMEOUT_SECONDS = 10;
+
+static IConfigLoader getConfigLoader(Map conf) {
+Map loaderParams = (Map) 
conf.get(DaemonConfig.SCHEDULER_USER_POOLS_LOADER_PARAMS);
+if (loaderParams != null) {
+String uriString = (String) 
loaderParams.get(SCHEDULER_CONFIG_LOADER_URI);
+if (uriString != null) {
+try {
+URI uri = new URI(uriString);
+String scheme = uri.getScheme();
+switch (SchemeType.toSchemeType(scheme)) {
+case FILE:
+return new FileConfigLoader(loaderParams);
--- End diff --

This seems a great idea to me. I will try to do so. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2199: [STORM-2201] Add dynamic scheduler configuration l...

2017-08-25 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2199#discussion_r135334556
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java 
---
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+public interface IConfigLoader {
+
+/**
+ * Load scheduler configs
+ * It first retrieves parameters like the location of the scheduler 
config file from the conf;
+ * and then get the actual scheduler configs associated with the 
configKey from that location.
+ * @param configKey The key from which we want to get the scheduler 
config.
+ * @return The scheduler configs
+ */
+Map load(String configKey);
+
+String SCHEDULER_CONFIG_LOADER_URI = "scheduler.config.loader.uri";
+String SCHEDULER_CONFIG_LOADER_POLLTIME_SECS = 
"scheduler.config.loader.polltime.secs";
+String SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS = 
"scheduler.config.loader.timeout.secs";
+
+int DEFAULT_POLLTIME_SECS = 600;
+int DEFAULT_TIMEOUT_SECONDS = 10;
+
+static IConfigLoader getConfigLoader(Map conf) {
+Map loaderParams = (Map) 
conf.get(DaemonConfig.SCHEDULER_USER_POOLS_LOADER_PARAMS);
+if (loaderParams != null) {
+String uriString = (String) 
loaderParams.get(SCHEDULER_CONFIG_LOADER_URI);
+if (uriString != null) {
+try {
+URI uri = new URI(uriString);
+String scheme = uri.getScheme();
+switch (SchemeType.toSchemeType(scheme)) {
+case FILE:
+return new FileConfigLoader(loaderParams);
--- End diff --

Actually I was thinking we might want to use a factory pattern so we can 
cache connections and other things in the IConfigLoader implementation.  
Something perhaps like.

```java
public interface IConfigLoaderFactory {
IConfigLoader createIfSupported(URI uri, Map conf);

private static ServiceLoader serviceLoader = 
ServiceLoader.load(IConfigLoaderFactory.class);

public static IConfigLoader open(URI uri, Map conf) {
for (IConfigLoaderFactory factory: serviceLoader) {
IConfigLoaderFactory ret = factory.createIfSupported(uri, conf);
if (ret != null) {
return ret;
}
}
throw some exception();
}
}

public class HttpArtifactoryServiceLoaderFactory implements 
IConfigLoaderFactory{
public IConfigLoader createIfSupported(URI uri, Map 
conf);
if (!"artifactory+http".equals(uri.getScheme())) {
return null;
}
return new ...;
}
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2199: [STORM-2201] Add dynamic scheduler configuration l...

2017-08-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2199#discussion_r135331427
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java 
---
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+public interface IConfigLoader {
+
+/**
+ * Load scheduler configs
+ * It first retrieves parameters like the location of the scheduler 
config file from the conf;
+ * and then get the actual scheduler configs associated with the 
configKey from that location.
+ * @param configKey The key from which we want to get the scheduler 
config.
+ * @return The scheduler configs
+ */
+Map load(String configKey);
+
+String SCHEDULER_CONFIG_LOADER_URI = "scheduler.config.loader.uri";
+String SCHEDULER_CONFIG_LOADER_POLLTIME_SECS = 
"scheduler.config.loader.polltime.secs";
+String SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS = 
"scheduler.config.loader.timeout.secs";
+
+int DEFAULT_POLLTIME_SECS = 600;
+int DEFAULT_TIMEOUT_SECONDS = 10;
+
+static IConfigLoader getConfigLoader(Map conf) {
+Map loaderParams = (Map) 
conf.get(DaemonConfig.SCHEDULER_USER_POOLS_LOADER_PARAMS);
+if (loaderParams != null) {
+String uriString = (String) 
loaderParams.get(SCHEDULER_CONFIG_LOADER_URI);
+if (uriString != null) {
+try {
+URI uri = new URI(uriString);
+String scheme = uri.getScheme();
+switch (SchemeType.toSchemeType(scheme)) {
+case FILE:
+return new FileConfigLoader(loaderParams);
--- End diff --

I have been thinking about using a ServiceLoader and tried so. But I 
realized that we actually want to have only one type of loader working in the 
mean time (one loader for one type of scheme). So I think `switch-case` here 
seems cleaner.   

The way I can think of applying ServiceLoader here is to have every type of 
loaders check the `scheme` in their `load()` function and then either directly 
return `null` (because they don't deal with this type of `scheme`) or return 
the result. Something like:

For HttpConfigLoader
```
Map load(loaderParams) {
scheme = getScheme();
if (!scheme.equalsIgnoreCase("http")) {
return null;
}
//else: process and return result;
}
```
For ArtifactoryHttpConfigLoader,
```
Map load(loaderParams) {
scheme = getScheme();
if (!scheme.equalsIgnoreCase("artifactory+http")) {
return null;
}
//else: process and return result;
}
```
Is this what you are talking about? Sorry if I mis-understood it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

2017-08-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/2289#discussion_r135317773
  
--- Diff: 
examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java 
---
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.PartialKeyGrouping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of measurements about a stream so we can statistically reproduce 
it.
+ */
+public class InputStream implements Serializable {
+private static final Logger LOG = 
LoggerFactory.getLogger(InputStream.class);
+public final String fromComponent;
+public final String toComponent;
+public final String id;
+public final NormalDistStats execTime;
+public final NormalDistStats processTime;
+public final GroupingType groupingType;
+//Cached GlobalStreamId
+private GlobalStreamId gsid = null;
+
+/**
+ * Create an output stream from a config.
+ * @param conf the config to read from.
+ * @return the read OutputStream.
+ */
+public static InputStream fromConf(Map conf) {
+String component = (String) conf.get("from");
+String toComp = (String) conf.get("to");
+NormalDistStats execTime = NormalDistStats.fromConf((Map) conf.get("execTime"));
+NormalDistStats processTime = 
NormalDistStats.fromConf((Map) conf.get("processTime"));
+Map grouping = (Map) 
conf.get("grouping");
+GroupingType groupingType = GroupingType.fromConf((String) 
grouping.get("type"));
+String streamId = (String) grouping.getOrDefault("streamId", 
"default");
+return new InputStream(component, toComp, streamId, execTime, 
processTime, groupingType);
+}
+
+/**
+ * Convert this to a conf.
+ * @return the conf.
+ */
+public Map toConf() {
+Map ret = new HashMap<>();
+ret.put("from", fromComponent);
+ret.put("to", toComponent);
+ret.put("execTime", execTime.toConf());
+ret.put("processTime", processTime.toConf());
+
+Map grouping = new HashMap<>();
+grouping.put("streamId", id);
+grouping.put("type", groupingType.toConf());
+ret.put("grouping", grouping);
+
+return ret;
+}
+
+public static class Builder {
+private String fromComponent;
+private String toComponent;
+private String id;
+private NormalDistStats execTime;
+private NormalDistStats processTime;
+private GroupingType groupingType = GroupingType.SHUFFLE;
+
+public String getFromComponent() {
--- End diff --

Ahh, sorry. I missed their use.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2249: STORM-2648/STORM-2357: Add storm-kafka-client support for...

2017-08-25 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2249
  
Still +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2289: STORM-2702: storm-loadgen

2017-08-25 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2289
  
@knusbaum I think I have addressed all of your review comments so far.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

2017-08-25 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2289#discussion_r135307748
  
--- Diff: 
examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java 
---
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.PartialKeyGrouping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of measurements about a stream so we can statistically reproduce 
it.
+ */
+public class InputStream implements Serializable {
+private static final Logger LOG = 
LoggerFactory.getLogger(InputStream.class);
+public final String fromComponent;
+public final String toComponent;
+public final String id;
+public final NormalDistStats execTime;
+public final NormalDistStats processTime;
+public final GroupingType groupingType;
+//Cached GlobalStreamId
+private GlobalStreamId gsid = null;
+
+/**
+ * Create an output stream from a config.
+ * @param conf the config to read from.
+ * @return the read OutputStream.
+ */
+public static InputStream fromConf(Map conf) {
+String component = (String) conf.get("from");
+String toComp = (String) conf.get("to");
+NormalDistStats execTime = NormalDistStats.fromConf((Map) conf.get("execTime"));
+NormalDistStats processTime = 
NormalDistStats.fromConf((Map) conf.get("processTime"));
+Map grouping = (Map) 
conf.get("grouping");
+GroupingType groupingType = GroupingType.fromConf((String) 
grouping.get("type"));
+String streamId = (String) grouping.getOrDefault("streamId", 
"default");
+return new InputStream(component, toComp, streamId, execTime, 
processTime, groupingType);
+}
+
+/**
+ * Convert this to a conf.
+ * @return the conf.
+ */
+public Map toConf() {
+Map ret = new HashMap<>();
+ret.put("from", fromComponent);
+ret.put("to", toComponent);
+ret.put("execTime", execTime.toConf());
+ret.put("processTime", processTime.toConf());
+
+Map grouping = new HashMap<>();
+grouping.put("streamId", id);
+grouping.put("type", groupingType.toConf());
+ret.put("grouping", grouping);
+
+return ret;
+}
+
+public static class Builder {
+private String fromComponent;
+private String toComponent;
+private String id;
+private NormalDistStats execTime;
+private NormalDistStats processTime;
+private GroupingType groupingType = GroupingType.SHUFFLE;
+
+public String getFromComponent() {
--- End diff --

No they don't but it was the most convenient way to pull out the needed 
information while still building the object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2289: STORM-2702: storm-loadgen

2017-08-25 Thread knusbaum
Github user knusbaum commented on the issue:

https://github.com/apache/storm/pull/2289
  
Partial review done.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

2017-08-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/2289#discussion_r135084038
  
--- Diff: 
examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java 
---
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.storm.Config;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.BoltStats;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologyPageInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.generated.WorkerSummary;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.ObjectReader;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Capture running topologies for load gen later on.
+ */
+public class CaptureLoad {
+private static final Logger LOG = 
LoggerFactory.getLogger(CaptureLoad.class);
+public static final String DEFAULT_OUT_DIR = "./loadgen/";
+
+private static List extractBoltValues(List 
summaries,
+  GlobalStreamId id,
+  Function>> func) {
+
+List ret = new ArrayList<>();
+if (summaries != null) {
+for (ExecutorSummary summ : summaries) {
+if (summ != null && summ.is_set_stats()) {
+Map> data = 
func.apply(summ.get_stats().get_specific().get_bolt());
+if (data != null) {
+List subvalues = data.values().stream()
+.map((subMap) -> subMap.get(id))
+.filter((value) -> value != null)
+.mapToDouble((value) -> value.doubleValue())
+.boxed().collect(Collectors.toList());
+ret.addAll(subvalues);
+}
+}
+}
+}
+return ret;
+}
+
+static TopologyLoadConf captureTopology(Nimbus.Iface client, 
TopologySummary topologySummary) throws Exception {
+String topologyName = topologySummary.get_name();
+LOG.info("Capturing {}...", topologyName);
+String topologyId = topologySummary.get_id();
+TopologyInfo info = client.getTopologyInfo(topologyId);
+TopologyPageInfo tpinfo = client.getTopologyPageInfo(topologyId, 
":all-time", false);
+@SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
+StormTopology topo = client.getUserTopology(topologyId);
+//Done capturing topology information...
+
+Map savedTopoConf = new HashMap<>();
+Map topoConf = 

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

2017-08-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/2289#discussion_r135073224
  
--- Diff: examples/storm-loadgen/README.md ---
@@ -0,0 +1,195 @@
+# Storm Load Generation Tools
+
+A set of tools to place an artificial load on a storm cluster to compare 
against a different storm cluster.  This is particularly helpful when making 
changes to the data path in storm to see what if any impact the changes had.  
This is also useful for end users that want to compare different hardware 
setups to see what the trade-offs are, although actually running your real 
topologies is going to be more accurate.
+
+## Methodology
+The idea behind all of these tools is to measure the trade-offs between 
latency, throughput, and cost when processing data using Apache Storm.
+
+When processing data you typically will know a few things.  First you will 
know about how much data you are going to be processing.  This will typically 
be a range of values that change throughput the day.  You also will have an 
idea of how quickly you need the data processed by.  Often this is measured in 
terms of the latency it takes to process data at the some percentile or set of 
percentiles.  This is because of most use cases the value of the data declines 
over time, and being able to react to the data quickly is more valuable.  You 
probably also have a budget for how much you are willing to spend to be able to 
process this data.  There are always trade-offs in how quickly you can process 
some data and how efficiently you can processes that data both in terms of 
resource usage (cost) and latency.  These tools are designed to help you 
explore that space.
--- End diff --

Couple nits:
1. 
```
 Often this is measured in terms of the latency it takes to process data at 
the some percentile or set of percentiles.
```
`the some`


2. 
```This is because of most use cases ...```
*in* most use cases? *for* most use cases?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

2017-08-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/2289#discussion_r135074782
  
--- Diff: examples/storm-loadgen/README.md ---
@@ -0,0 +1,195 @@
+# Storm Load Generation Tools
+
+A set of tools to place an artificial load on a storm cluster to compare 
against a different storm cluster.  This is particularly helpful when making 
changes to the data path in storm to see what if any impact the changes had.  
This is also useful for end users that want to compare different hardware 
setups to see what the trade-offs are, although actually running your real 
topologies is going to be more accurate.
+
+## Methodology
+The idea behind all of these tools is to measure the trade-offs between 
latency, throughput, and cost when processing data using Apache Storm.
+
+When processing data you typically will know a few things.  First you will 
know about how much data you are going to be processing.  This will typically 
be a range of values that change throughput the day.  You also will have an 
idea of how quickly you need the data processed by.  Often this is measured in 
terms of the latency it takes to process data at the some percentile or set of 
percentiles.  This is because of most use cases the value of the data declines 
over time, and being able to react to the data quickly is more valuable.  You 
probably also have a budget for how much you are willing to spend to be able to 
process this data.  There are always trade-offs in how quickly you can process 
some data and how efficiently you can processes that data both in terms of 
resource usage (cost) and latency.  These tools are designed to help you 
explore that space.
+
+A note on how latency is measured.  Storm typically measures latency from 
when a message is emitted by a spout until the point it is fully acked or 
failed (in many versions of storm it actually does this in the acker instead of 
the spout so it is trying to be a measure of how long it takes for the actual 
processing, removing as much of the acker overhead as possible).  For these 
tools we do it differently.  We simulate a throughput and measure the start 
time of the tuple from when it would have been emitted if the topology could 
keep up with the load.  In the normal case this should not be an issue, but if 
the topology cannot keep up with the throughput you will see the latency grow 
very high compared to the latency reported by storm.
+
+## Tools
+### CaptureLoad 
+
+`CaptureLoad` will look at the topologies on a running cluster and store 
the structure of and metrics about each of theses topologies storing them in a 
format that can be used later to reproduce a similar load on the cluster.
+
+ Usage
+```
+storm jar storm-loadgen.jar org.apache.storm.loadgen.CaptureLoad [options] 
[topologyName]*
+```
+|Option| Description|
+|-|-|
+|-a,--anonymize | Strip out any possibly identifiable information|
+| -h,--help | Print a help message |
+| -o,--output-dir  | Where to write (defaults to ./loadgen/)|
+
+ Limitations
+This is still a work in progress.  It does not currently capture CPU or 
memory usage of a topology.  Resource requests (used by RAS when scheduling) 
within the topology are also not captured yet, nor is the user that actually 
ran the topology.
+
+### GenLoad
+
+`GenLoad` will take the files produced by `CaptureLoad` and replay them in 
a simulated way on a cluster.  It also offers lots of ways to capture metrics 
about those simulated topologies to be able to compare different software 
versions of different hardware setups.  You can also make adjustments to the 
topology before submitting it to change the size or throughput of the topology.
+
+### Usage
+```
+storm jar storm-loadgen.jar org.apache.storm.loadgen.GenLoad [options] 
[capture_file]*
+```
+
+|Option| Description|
+|-|-|
+| --debug | Print debug information about the adjusted topology before 
submitting it. |
+|-h,--help | Print a help message |
+| --local-or-shuffle | Replace shuffle grouping with local or shuffle 
grouping. |
+| --parallel  | How much to scale the topology 
up or down in parallelism. The new parallelism will round up to the next whole 
number. If a topology + component is supplied only that component will be 
scaled. If topo or component is blank or a `'*'` all topologies or components 
matched the other part will be scaled. Only 1 scaling rule, the most specific, 
will be applied to a component. Providing a topology name is considered more 
specific than not providing one. (defaults to 1.0 no scaling) |
+| -r,--report-interval  | How long in between reported 
metrics.  Will be rounded up to the next 10 sec boundary. default 30 |
+| --reporter   | Provide the co

[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

2017-08-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/2289#discussion_r135078488
  
--- Diff: 
examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java 
---
@@ -0,0 +1,468 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.storm.Config;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.BoltStats;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.ComponentCommon;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologyPageInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.generated.WorkerSummary;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.ObjectReader;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+import org.json.simple.parser.JSONParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Capture running topologies for load gen later on.
+ */
+public class CaptureLoad {
+private static final Logger LOG = 
LoggerFactory.getLogger(CaptureLoad.class);
+public static final String DEFAULT_OUT_DIR = "./loadgen/";
+
+private static List extractBoltValues(List 
summaries,
+  GlobalStreamId id,
+  Function>> func) {
+
+List ret = new ArrayList<>();
+if (summaries != null) {
+for (ExecutorSummary summ : summaries) {
+if (summ != null && summ.is_set_stats()) {
+Map> data = 
func.apply(summ.get_stats().get_specific().get_bolt());
+if (data != null) {
+List subvalues = data.values().stream()
+.map((subMap) -> subMap.get(id))
+.filter((value) -> value != null)
+.mapToDouble((value) -> value.doubleValue())
+.boxed().collect(Collectors.toList());
--- End diff --

I think the `mapToDouble` and the `boxed` are unnecessary. 
They are inverse operations, first taking a `Stream` from 
`filter(...)` and making a `DoubleStream`, then reboxing and turning the 
`DoubleStream` into a `Stream`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

2017-08-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/2289#discussion_r135074180
  
--- Diff: examples/storm-loadgen/README.md ---
@@ -0,0 +1,195 @@
+# Storm Load Generation Tools
+
+A set of tools to place an artificial load on a storm cluster to compare 
against a different storm cluster.  This is particularly helpful when making 
changes to the data path in storm to see what if any impact the changes had.  
This is also useful for end users that want to compare different hardware 
setups to see what the trade-offs are, although actually running your real 
topologies is going to be more accurate.
+
+## Methodology
+The idea behind all of these tools is to measure the trade-offs between 
latency, throughput, and cost when processing data using Apache Storm.
+
+When processing data you typically will know a few things.  First you will 
know about how much data you are going to be processing.  This will typically 
be a range of values that change throughput the day.  You also will have an 
idea of how quickly you need the data processed by.  Often this is measured in 
terms of the latency it takes to process data at the some percentile or set of 
percentiles.  This is because of most use cases the value of the data declines 
over time, and being able to react to the data quickly is more valuable.  You 
probably also have a budget for how much you are willing to spend to be able to 
process this data.  There are always trade-offs in how quickly you can process 
some data and how efficiently you can processes that data both in terms of 
resource usage (cost) and latency.  These tools are designed to help you 
explore that space.
+
+A note on how latency is measured.  Storm typically measures latency from 
when a message is emitted by a spout until the point it is fully acked or 
failed (in many versions of storm it actually does this in the acker instead of 
the spout so it is trying to be a measure of how long it takes for the actual 
processing, removing as much of the acker overhead as possible).  For these 
tools we do it differently.  We simulate a throughput and measure the start 
time of the tuple from when it would have been emitted if the topology could 
keep up with the load.  In the normal case this should not be an issue, but if 
the topology cannot keep up with the throughput you will see the latency grow 
very high compared to the latency reported by storm.
+
+## Tools
+### CaptureLoad 
+
+`CaptureLoad` will look at the topologies on a running cluster and store 
the structure of and metrics about each of theses topologies storing them in a 
format that can be used later to reproduce a similar load on the cluster.
--- End diff --

theses


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

2017-08-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/2289#discussion_r135085622
  
--- Diff: 
examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java
 ---
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.storm.Config;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.loadgen.CaptureLoad;
+import org.apache.storm.utils.NimbusClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Estimate the throughput of all topologies.
+ */
+public class EstimateThroughput {
+private static final Logger LOG = 
LoggerFactory.getLogger(EstimateThroughput.class);
+
+/**
+ * Main entry point for estimate throughput command.
+ * @param args the command line arguments.
+ * @throws Exception on any error.
+ */
+public static void main(String[] args) throws Exception {
+Options options = new Options();
+options.addOption(Option.builder("h")
+.longOpt("help")
+.desc("Print a help message")
+.build());
+CommandLineParser parser = new DefaultParser();
+CommandLine cmd = null;
+ParseException pe = null;
+try {
+cmd = parser.parse(options, args);
+} catch (ParseException e) {
+pe = e;
+}
+if (pe != null || cmd.hasOption('h')) {
+if (pe != null) {
+System.err.println("ERROR " + pe.getMessage());
--- End diff --

(same)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2289: STORM-2702: storm-loadgen

2017-08-25 Thread knusbaum
Github user knusbaum commented on a diff in the pull request:

https://github.com/apache/storm/pull/2289#discussion_r135086409
  
--- Diff: 
examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/InputStream.java 
---
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.loadgen;
+
+import java.io.ByteArrayInputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.grouping.PartialKeyGrouping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A set of measurements about a stream so we can statistically reproduce 
it.
+ */
+public class InputStream implements Serializable {
+private static final Logger LOG = 
LoggerFactory.getLogger(InputStream.class);
+public final String fromComponent;
+public final String toComponent;
+public final String id;
+public final NormalDistStats execTime;
+public final NormalDistStats processTime;
+public final GroupingType groupingType;
+//Cached GlobalStreamId
+private GlobalStreamId gsid = null;
+
+/**
+ * Create an output stream from a config.
+ * @param conf the config to read from.
+ * @return the read OutputStream.
+ */
+public static InputStream fromConf(Map conf) {
+String component = (String) conf.get("from");
+String toComp = (String) conf.get("to");
+NormalDistStats execTime = NormalDistStats.fromConf((Map) conf.get("execTime"));
+NormalDistStats processTime = 
NormalDistStats.fromConf((Map) conf.get("processTime"));
+Map grouping = (Map) 
conf.get("grouping");
+GroupingType groupingType = GroupingType.fromConf((String) 
grouping.get("type"));
+String streamId = (String) grouping.getOrDefault("streamId", 
"default");
+return new InputStream(component, toComp, streamId, execTime, 
processTime, groupingType);
+}
+
+/**
+ * Convert this to a conf.
+ * @return the conf.
+ */
+public Map toConf() {
+Map ret = new HashMap<>();
+ret.put("from", fromComponent);
+ret.put("to", toComponent);
+ret.put("execTime", execTime.toConf());
+ret.put("processTime", processTime.toConf());
+
+Map grouping = new HashMap<>();
+grouping.put("streamId", id);
+grouping.put("type", groupingType.toConf());
+ret.put("grouping", grouping);
+
+return ret;
+}
+
+public static class Builder {
+private String fromComponent;
+private String toComponent;
+private String id;
+private NormalDistStats execTime;
+private NormalDistStats processTime;
+private GroupingType groupingType = GroupingType.SHUFFLE;
+
+public String getFromComponent() {
--- End diff --

It doesn't really bother me, but do builders usually have getters?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2199: [STORM-2201] Add dynamic scheduler configuration l...

2017-08-25 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2199#discussion_r135291998
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryHttpConfigLoader.java
 ---
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Time;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+/**
+ * A dynamic loader that can load scheduler configurations for user 
resource guarantees from Artifactory (an artifact repository manager).
+ */
+public class ArtifactoryHttpConfigLoader implements IConfigLoader {
+protected static final String ARTIFACTORY_BASE_DIRECTORY = 
"artifactory.config.loader.base.directory";
--- End diff --

If we have configs in here can we tag them with the config validation 
annotations?  
```
@isSting
```

And also register it with the config service loader?
```
import org.apache.storm.validation.Validated;

...

ArtifactoryHttpConfigLoader implements IConfigLoader, Validated {
```
and then append 
`org.apache.storm.scheduler.utils.ArtifactoryHttpConfigLoader` to 
./storm-server/src/main/resources/META-INF/services/org.apache.storm.validation.Validated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2199: [STORM-2201] Add dynamic scheduler configuration l...

2017-08-25 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2199#discussion_r135290270
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java 
---
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+public interface IConfigLoader {
+
+/**
+ * Load scheduler configs
+ * It first retrieves parameters like the location of the scheduler 
config file from the conf;
+ * and then get the actual scheduler configs associated with the 
configKey from that location.
+ * @param configKey The key from which we want to get the scheduler 
config.
+ * @return The scheduler configs
+ */
+Map load(String configKey);
+
+String SCHEDULER_CONFIG_LOADER_URI = "scheduler.config.loader.uri";
+String SCHEDULER_CONFIG_LOADER_POLLTIME_SECS = 
"scheduler.config.loader.polltime.secs";
+String SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS = 
"scheduler.config.loader.timeout.secs";
+
+int DEFAULT_POLLTIME_SECS = 600;
+int DEFAULT_TIMEOUT_SECONDS = 10;
+
+static IConfigLoader getConfigLoader(Map conf) {
+Map loaderParams = (Map) 
conf.get(DaemonConfig.SCHEDULER_USER_POOLS_LOADER_PARAMS);
+if (loaderParams != null) {
+String uriString = (String) 
loaderParams.get(SCHEDULER_CONFIG_LOADER_URI);
+if (uriString != null) {
+try {
+URI uri = new URI(uriString);
+String scheme = uri.getScheme();
+switch (SchemeType.toSchemeType(scheme)) {
+case FILE:
+return new FileConfigLoader(loaderParams);
--- End diff --

To me it feels like this would be a great place to use a ServiceLoader 
instead of having a hard coded FileConfigLoader or ArtifactoryHttpConfigLoader.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2199: [STORM-2201] Add dynamic scheduler configuration l...

2017-08-25 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2199#discussion_r135292464
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java 
---
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+public interface IConfigLoader {
+
+/**
+ * Load scheduler configs
+ * It first retrieves parameters like the location of the scheduler 
config file from the conf;
+ * and then get the actual scheduler configs associated with the 
configKey from that location.
+ * @param configKey The key from which we want to get the scheduler 
config.
+ * @return The scheduler configs
+ */
+Map load(String configKey);
+
+String SCHEDULER_CONFIG_LOADER_URI = "scheduler.config.loader.uri";
+String SCHEDULER_CONFIG_LOADER_POLLTIME_SECS = 
"scheduler.config.loader.polltime.secs";
+String SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS = 
"scheduler.config.loader.timeout.secs";
--- End diff --

Here too can we annotate these with config validation annotations, and make 
IConfigLoader part of the Config Validation service loader?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-25 Thread kamleshbhatt
Github user kamleshbhatt commented on the issue:

https://github.com/apache/storm/pull/2233
  
Thanks for all your help and guidance.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-25 Thread kamleshbhatt
Github user kamleshbhatt closed the pull request at:

https://github.com/apache/storm/pull/2233


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2199: [STORM-2201] Add dynamic scheduler configuration loading

2017-08-25 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2199
  
@revans2  Do you have any suggestions?  Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2199: [STORM-2201] Add dynamic scheduler configuration loading

2017-08-25 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2199
  
@Ethanlm Thanks for the information. Looks like we can't expect users to 
use artifactory. If possible we'd be better to have HttpConfigLoader 
implementation, but may be not easy to implement same for artifactory like 
pulling most recent file. We can start with just getting full URL for the 
config file, and pull it periodically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Some old pull requests worth to revisit

2017-08-25 Thread Jungtaek Lim
Hi devs,

I have been revisiting the PR pages and found some pull requests which are
really old, but looks like worth to revisit it. I already handled some of
them, and it would be great if someone could help reviewing.

* STORM-2083: Blacklist Scheduler
PR: https://github.com/apache/storm/pull/1674
The feature itself sounds great, and there're already many review comments
in the pull request. According to author's todo list, the patch looks close
to the final state. We may be able to put some more efforts in reviewing
and finalize it. Merge conflict can be handled after reviewing.

* STORM-1083: Upgrade netty transport from 3.x to 4.x
PR: https://github.com/apache/storm/pull/1591
Netty 3.x became EOL for a long time but we are still in 3.x. Unless
performance hit is observed, we would be better to move to Netty 4.x to
continuously get benefits provided by Netty team.

* STORM-2286: Storm Rebalance command should support arbitrary component
parallelism
PR: https://issues.apache.org/jira/browse/STORM-2286
Looks like Arun took a look before, but missed it afterwards.

Please share if you found other issues/pull requests worth to revive.

Thanks,
Jungtaek Lim (HeartSaVioR)


[GitHub] storm issue #2249: STORM-2648/STORM-2357: Add storm-kafka-client support for...

2017-08-25 Thread srdo
Github user srdo commented on the issue:

https://github.com/apache/storm/pull/2249
  
@hmcl Are you reviewing this, or are you satisfied with it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2199: [STORM-2201] Add dynamic scheduler configuration loading

2017-08-25 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2199
  
@HeartSaVioR  You may want to check with this 
[website](https://www.jfrog.com/artifactory/). My understanding is that we can 
get most recent file from artifactory server. It's not the same with normal 
http server.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2289: STORM-2702: storm-loadgen

2017-08-25 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/2289
  
I think I am done with new features unless someone else comes up with 
something else that I missed out on. I really would appreciate some reviews. 
@knusbaum if you have the time I know you expressed interest in this patch.

@roshannaik if you really want metrics about the emit to ack/fail latency I 
can add those in too, but the only time that they become useful is if the 
topology cannot keep up with the load placed on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] Release Storm 1.0.5 / 1.1.2

2017-08-25 Thread Alexandre Vermeerbergen
Hello,

Would you please include https://issues.apache.org/jira/browse/STORM-2648
in upcoming Storm 1.1.2 release : we need to have latency etc stats when
using Storm Kafka Client spout in autocommit mode, not having this feature
is blocking us from moving from old Storm-Kafka lib (limited to Kafka
0.9.x) to Storm-Kafka-Client lib (required for Kafka 0.10.x compatibility).

Best regards,
Alexandre




2017-08-25 9:26 GMT+02:00 Jungtaek Lim :

> Hi devs,
>
> We received a bug report (STORM-2682
> ) on Storm 1.0.4 and
> 1.1.1 which prevents Storm cluster from update. Personally it looks like
> pretty critical, and hopefully it is fixed now.
> So maybe we would like to have another bug fix releases quickly for
> affected 1.x version lines. What do you think?
>
> Also please enumerate the issues if you would want to include any bug fix
> issues to the new bug fix releases, so that we can create epic issues and
> track them to make releases happening sooner.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>


Re: Still getting "Internal error processing getClusterInfo" with Storm 1.1.0, isn't STORM-1977 supposed to be closed?

2017-08-25 Thread Jungtaek Lim
I'm not sure. Topology code can't be restored, so my best bet would be
detecting it (periodically, or react in failure) and give up leadership.
If my memory is right, leader Nimbus doesn't pull blobs from followers, so
if it doesn't have any blobs and need to sync, it just needs to become
follower and let other nimbuses to take a chance to be a leader.

It would not help for non-HA Nimbus cluster anyway. What's next? Maybe we
could remove related topology (from ZK), but it feels me a bit dangerous to
do it automatically, so I'd rather leaning on CLI (maybe) tool so that
operators can do it manually.

Please file an issue regarding this when you would want to have a solution.

Best regards,
Jungtaek Lim (HeartSaVioR)

2017년 8월 25일 (금) 오전 6:29, Alexandre Vermeerbergen 님이
작성:

> Hello Jungtaek,
>
> I confirm that we currently do not have multiple Nimbus nodes.
>
> I want to clarify that Nimbus process never crashed : it keep printing in
> its log the error:
>
> 2017-08-06 03:44:01.777 o.a.s.t.ProcessFunction pool-14-thread-1 [ERROR]
> Internal error processing getClusterInfo
> org.apache.storm.generated.KeyNotFoundException: null
> ... (rest of the stack trace)
>
> However I forgot to mention that:
>
> * During this problem our topologies are unaffected
> * We notice the problem because we have a self-healing check on Nimbus-UI
> process which consists in calling Web Services to get a few stats on
> running topologies. But when Nimbus-UI is unresponsive, we kill it and
> restart it automatically.
> * The period of our NimbusUI self healing cron is 1 minute
> * When the getClusterInfo stack trace occurs on Nimbus log, Nimbus UI is
> unable to restart
> * Here's the kind of exceptions we see in ui.log  (Nimbus UI's trace):
>
> 2017-08-24 21:24:21.777 o.a.s.u.core main [INFO] Starting ui server for
> storm version '1.1.0'
> 2017-08-24 21:24:21.788 o.a.s.d.m.MetricsUtils main [INFO] Using statistics
> reporter
> plugin:org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter
> 2017-08-24 21:24:21.790 o.a.s.d.m.r.JmxPreparableReporter main [INFO]
> Preparing...
> 2017-08-24 21:24:21.801 o.a.s.d.common main [INFO] Started statistics
> report plugin...
> 2017-08-24 21:24:21.878 o.a.s.s.o.e.j.s.Server main [INFO]
> jetty-7.x.y-SNAPSHOT
> 2017-08-24 21:24:21.929 o.a.s.s.o.e.j.s.h.ContextHandler main [INFO]
> started o.a.s.s.o.e.j.s.ServletContextHandler{/,null}
> 2017-08-24 21:24:21.965 o.a.s.s.o.e.j.s.AbstractConnector main [INFO]
> Started SelectChannelConnector@0.0.0.0:8070
> 2017-08-24 21:25:02.446 o.a.s.u.NimbusClient qtp2142536057-19 [WARN]
> Ignoring exception while trying to get leader nimbus info from
> ec2-52-48-59-151.eu-west-1.compute.amazonaws.com. will retry with a
> different seed host.
> org.apache.storm.thrift.transport.TTransportException: null
> at
>
> org.apache.storm.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
> org.apache.storm.thrift.transport.TTransport.readAll(TTransport.java:86)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
>
> org.apache.storm.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:129)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
>
> org.apache.storm.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
> org.apache.storm.thrift.transport.TTransport.readAll(TTransport.java:86)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
>
> org.apache.storm.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
>
> org.apache.storm.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
>
> org.apache.storm.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
> org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
> org.apache.storm.generated.Nimbus$Client.recv_getLeader(Nimbus.java:1193)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
> org.apache.storm.generated.Nimbus$Client.getLeader(Nimbus.java:1181)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
>
> org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:84)
> ~[storm-core-1.1.0.jar:1.1.0]
> at org.apache.storm.ui.core$cluster_summary.invoke(core.clj:355)
> ~[storm-core-1.1.0.jar:1.1.0]
> at org.apache.storm.ui.core$fn__9556.invoke(core.clj:1113)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
>
> org.apache.storm.shade.compojure.core$make_route$fn__5976.invoke(core.clj:100)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
> org.apache.storm.shade.compojure.core$if_route$fn__5964.invoke(core.clj:46)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
>
> org.apache.storm.shade.compojure.core$if_method$fn__5957.invoke(core.clj:31)
> ~[storm-core-1.1.0.jar:1.1.0]
> at
> org.apache.storm.shade.compojure.core$routing$fn__598

[DISCUSS] Release Storm 1.0.5 / 1.1.2

2017-08-25 Thread Jungtaek Lim
Hi devs,

We received a bug report (STORM-2682
) on Storm 1.0.4 and
1.1.1 which prevents Storm cluster from update. Personally it looks like
pretty critical, and hopefully it is fixed now.
So maybe we would like to have another bug fix releases quickly for
affected 1.x version lines. What do you think?

Also please enumerate the issues if you would want to include any bug fix
issues to the new bug fix releases, so that we can create epic issues and
track them to make releases happening sooner.

Thanks,
Jungtaek Lim (HeartSaVioR)