[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2261#discussion_r134645082
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---
@@ -52,25 +78,92 @@ public void prepare(WorkerTopologyContext context, 
GlobalStreamId stream, List chooseTasks(int taskId, List values, 
LoadMapping load) {
-if ((lastUpdate + 1000) < System.currentTimeMillis()) {
-int local_total = 0;
-for (int i = 0; i < targets.length; i++) {
-int val = (int)(101 - (load.get(targets[i]) * 100));
-loads[i] = val;
-local_total += val;
+int rightNow;
+while (true) {
+rightNow = current.incrementAndGet();
+if (rightNow < CAPACITY) {
+return rets[choices[rightNow]];
+} else if (rightNow == CAPACITY) {
+current.set(0);
+return rets[choices[0]];
 }
-total = local_total;
-lastUpdate = System.currentTimeMillis();
+//race condition with another thread, and we lost
+// try again
 }
-int selected = random.nextInt(total);
-int sum = 0;
-for (int i = 0; i < targets.length; i++) {
-sum += loads[i];
-if (selected < sum) {
-return rets[i];
+}
+
+private void updateRing(LoadMapping load) {
+int localTotal = 0;
+for (int i = 0 ; i < targets.length; i++) {
+int val = (int)(101 - (load.get(targets[i]) * 100));
+loads[i] = val;
+localTotal += val;
+}
+
+int currentIdx = 0;
+int unassignedIdx = 0;
+for (int i = 0 ; i < loads.length ; i++) {
+if (currentIdx == CAPACITY) {
+break;
 }
+
+int loadForTask = loads[i];
+int amount = Math.round(loadForTask * 1.0f * CAPACITY / 
localTotal);
+// assign at least one for task
+if (amount == 0) {
+unassigned[unassignedIdx++] = i;
+}
+for (int j = 0; j < amount; j++) {
+if (currentIdx == CAPACITY) {
+break;
+}
+
+prepareChoices[currentIdx++] = i;
+}
+}
+
+if (currentIdx < CAPACITY) {
+// if there're some rooms, give unassigned tasks a chance to 
be included
+// this should be really small amount, so just add them 
sequentially
+if (unassignedIdx > 0) {
+for (int i = currentIdx ; i < CAPACITY ; i++) {
+prepareChoices[i] = unassigned[(i - currentIdx) % 
unassignedIdx];
+}
+} else {
+// just pick random
+for (int i = currentIdx ; i < CAPACITY ; i++) {
+prepareChoices[i] = random.nextInt(loads.length);
+}
+}
+}
+
+shuffleArray(prepareChoices);
+
+// swapping two arrays
+int[] tempForSwap = choices;
+choices = prepareChoices;
+prepareChoices = tempForSwap;
+
+current.set(0);
--- End diff --

Again logically this should be -1 because we **increment and get** and 
unlike in `chooseTasks()` we don't read the value in this method, but it 
doesn't hurt much anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2261#discussion_r134644962
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---
@@ -20,30 +20,56 @@
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.WorkerTopologyContext;
 
 public class LoadAwareShuffleGrouping implements 
LoadAwareCustomStreamGrouping, Serializable {
+private static final int CAPACITY = 1000;
+
 private Random random;
 private List[] rets;
 private int[] targets;
 private int[] loads;
-private int total;
-private long lastUpdate = 0;
+private int[] unassigned;
+private int[] choices;
+private int[] prepareChoices;
+private AtomicInteger current;
 
 @Override
 public void prepare(WorkerTopologyContext context, GlobalStreamId 
stream, List targetTasks) {
 random = new Random();
+
 rets = (List[])new List[targetTasks.size()];
 targets = new int[targetTasks.size()];
 for (int i = 0; i < targets.length; i++) {
 rets[i] = Arrays.asList(targetTasks.get(i));
 targets[i] = targetTasks.get(i);
 }
+
+// can't leave choices to be empty, so initiate it similar as 
ShuffleGrouping
+choices = new int[CAPACITY];
+
+for (int i = 0 ; i < CAPACITY ; i++) {
+choices[i] = i % rets.length;
+}
+
+shuffleArray(choices);
+current = new AtomicInteger(0);
--- End diff --

Logically this should be -1 because we **increment and get**, but it 
doesn't hurt much.
(Same applies to ShuffleGrouping)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2261#discussion_r134644598
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---
@@ -52,25 +78,92 @@ public void prepare(WorkerTopologyContext context, 
GlobalStreamId stream, List chooseTasks(int taskId, List values, 
LoadMapping load) {
-if ((lastUpdate + 1000) < System.currentTimeMillis()) {
-int local_total = 0;
-for (int i = 0; i < targets.length; i++) {
-int val = (int)(101 - (load.get(targets[i]) * 100));
-loads[i] = val;
-local_total += val;
+int rightNow;
+while (true) {
+rightNow = current.incrementAndGet();
+if (rightNow < CAPACITY) {
+return rets[choices[rightNow]];
+} else if (rightNow == CAPACITY) {
+current.set(0);
--- End diff --

I borrowed this from ShuffleGrouping, and even I'm not clear whether it 
makes race condition, I think it is acceptable since race condition scenarios 
don't incur out of index, just letting some threads selecting same index and 
maybe skip some indices.
Moreover this patch contains tests which addresses multi-thread safety.

We can still replace `set` with `getAndSet` to make it fully thread-safe, 
but we need to do more experiments if we would want to. Same applies to 
ShuffleGrouping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

2017-08-22 Thread roshannaik
Github user roshannaik commented on the issue:

https://github.com/apache/storm/pull/2261
  
The impact of loadAware that you show here seems inline with what I have 
seen. Encouraging to see these improvements. 
Reviewed only the core chooseTasks() implementation and left one comment 
there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2261: STORM-2678 Improve performance of LoadAwareShuffle...

2017-08-22 Thread roshannaik
Github user roshannaik commented on a diff in the pull request:

https://github.com/apache/storm/pull/2261#discussion_r134640202
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/grouping/LoadAwareShuffleGrouping.java ---
@@ -52,25 +78,92 @@ public void prepare(WorkerTopologyContext context, 
GlobalStreamId stream, List chooseTasks(int taskId, List values, 
LoadMapping load) {
-if ((lastUpdate + 1000) < System.currentTimeMillis()) {
-int local_total = 0;
-for (int i = 0; i < targets.length; i++) {
-int val = (int)(101 - (load.get(targets[i]) * 100));
-loads[i] = val;
-local_total += val;
+int rightNow;
+while (true) {
+rightNow = current.incrementAndGet();
+if (rightNow < CAPACITY) {
+return rets[choices[rightNow]];
+} else if (rightNow == CAPACITY) {
+current.set(0);
--- End diff --

If you are trying to make this thread safe, i suspect this `current.set(0)` 
 is a race condition. not sure if its an acceptable race condition or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2270: [STORM-2686] Add Locality Aware Shuffle Grouping

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2270
  
@roshannaik  Thanks for the testing and comments.  I didn't test it on a 
multi-rack cluster since I don't have the resources to do it. I simulated it by 
adding some network latency using linux `tc` command.  Thanks for the good 
ideas. I will think about it in the future work


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2270: [STORM-2686] Add Locality Aware Shuffle Grouping

2017-08-22 Thread roshannaik
Github user roshannaik commented on the issue:

https://github.com/apache/storm/pull/2270
  
@Ethanlm  I would like to leave you with some thoughts for the future... 
maybe you have already thought about it. 
 You could also consider doing this based on user-defined locality 
assignment. Basically let users assign machines/supervisor to locality (just a 
number/name) and specify logical distance  (just a number) between localities. 
Organizations often use naming patterns for their hostnames that help 
determine if machines are on same rack or not. So a regex pattern could be used 
to specify which (supervisor) hostnames belong to any particular rack/locality.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2270: [STORM-2686] Add Locality Aware Shuffle Grouping

2017-08-22 Thread roshannaik
Github user roshannaik commented on the issue:

https://github.com/apache/storm/pull/2270
  
Ok  thats great since I noticed that the -c mode was not working for the 
grouping option due a bug in the topo creation code .  

BTW... I noticed that I incorrectly said something important earlier that I 
want to correct...  actually the ACKer is slower than the inter-worker (and not 
the other way around as I accidentally said). 

Your observations about LoadAware are similar to mine ...  which i 
discovered when doing STORM-2306. 

With STORM-2306, both ACKer and inter-worker throughputs are a good deal 
higher (but nowhere close to where they really should be).. So I felt it would 
be interesting to run this analysis on it to observe impact. Since this patch 
doesn't apply easily on 2306, it took me a bunch of time to manually resolve 
it. 

Anyway here are the results:

**No ACK mode**
LocalOrShuffle:   ~**7.12 mill/sec**
LASG : ~ **6.5 mill /sec**

**ACK mode**
LocalOrShuffle:   ~ **1.23 mill/sec** . Latency: avg:  **0.013 ms**  max:  
**0.015 ms**
LASG : ~  **1.26 mill/sec**  Latency: avg:  **0.017 ms**  
max:  **0.035 ms**

8% slower in noACK mode.  Avg latency is close, max latency is 2x+ higher 
in ACK mode. But should be ok. 

Overall these numbers look acceptable to me. 

 Once (hopefully soon) we fix the two bottlenecks (ACKer & interworker), 
there is a possibility that we may have to revisit and optimize this.

Have you had a chance to test this on a real multi-rack cluster and make 
any observations ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2288: [STORM-2700] Should not check Blobstore ACL if the valida...

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2288
  
@HeartSaVioR Yes. But changing default value seems not very simple. We 
probably need to file another JIRA for it if we want to do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2291: [STORM-2704] Check blob permission before submitti...

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2291#discussion_r134629852
  
--- Diff: storm-client/test/jvm/org/apache/storm/TestConfigValidate.java ---
@@ -84,21 +83,6 @@ public void invalidConfigTest() throws 
InvocationTargetException, NoSuchMethodEx
 ConfigValidation.validateFields(conf);
 }
 
-@Test(expected = InvalidTopologyException.class)
--- End diff --

Sure. Will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2270: [STORM-2686] Add Locality Aware Shuffle Grouping

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2270
  
@roshannaik  I passed a config file as the second argument when running the 
topology. `"args: [runDurationSec]  [optionalConfFile]"`  I thought it's the 
only way


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2291: [STORM-2704] Check blob permission before submitti...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2291#discussion_r134629272
  
--- Diff: storm-client/test/jvm/org/apache/storm/TestConfigValidate.java ---
@@ -84,21 +83,6 @@ public void invalidConfigTest() throws 
InvocationTargetException, NoSuchMethodEx
 ConfigValidation.validateFields(conf);
 }
 
-@Test(expected = InvalidTopologyException.class)
--- End diff --

Yes please do it if you really don't mind. I think we need to have broader 
coverage of tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2291: [STORM-2704] Check blob permission before submitti...

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2291#discussion_r134629037
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2529,7 +2529,7 @@ public void submitTopologyWithOpts(String topoName, 
String uploadedJarLocation,
 }
 }
 validateTopologyWorkerMaxHeapSizeConfigs(topoConf, topology);
-Utils.validateTopologyBlobStoreMap(topoConf, 
Sets.newHashSet(blobStore.listKeys()));
+Utils.validateTopologyBlobStoreMap(topoConf);
--- End diff --

Will see how to do it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2291: [STORM-2704] Check blob permission before submitti...

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2291#discussion_r134628949
  
--- Diff: storm-client/test/jvm/org/apache/storm/TestConfigValidate.java ---
@@ -84,21 +83,6 @@ public void invalidConfigTest() throws 
InvocationTargetException, NoSuchMethodEx
 ConfigValidation.validateFields(conf);
 }
 
-@Test(expected = InvalidTopologyException.class)
--- End diff --

I tried to mock it and then decided not to because the logic is really 
simple in `validateTopologyBlobStoreMap`. But I will add the unit test if you 
think it's better


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2270: [STORM-2686] Add Locality Aware Shuffle Grouping

2017-08-22 Thread roshannaik
Github user roshannaik commented on the issue:

https://github.com/apache/storm/pull/2270
  
@Ethanlm  for your ConstSpoutNullBolt runs ..  Did you select the  LASG vs 
LocalOrShuffle with  by selecting -c grouping=  cmd line option ? or via a 
config file setting ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2291: [STORM-2704] Check blob permission before submitti...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2291#discussion_r134621182
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2529,7 +2529,7 @@ public void submitTopologyWithOpts(String topoName, 
String uploadedJarLocation,
 }
 }
 validateTopologyWorkerMaxHeapSizeConfigs(topoConf, topology);
-Utils.validateTopologyBlobStoreMap(topoConf, 
Sets.newHashSet(blobStore.listKeys()));
+Utils.validateTopologyBlobStoreMap(topoConf);
--- End diff --

Maybe better to add method which contains NimbusBlobStore as parameter to 
avoid creating NimbusBlobStore in Nimbus.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2291: [STORM-2704] Check blob permission before submitti...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2291#discussion_r134621377
  
--- Diff: storm-client/test/jvm/org/apache/storm/TestConfigValidate.java ---
@@ -84,21 +83,6 @@ public void invalidConfigTest() throws 
InvocationTargetException, NoSuchMethodEx
 ConfigValidation.validateFields(conf);
 }
 
-@Test(expected = InvalidTopologyException.class)
--- End diff --

Could we mock NimbusBlobStore and retain the test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2290: [STORM-2703] Handle ExecutionException in handleWaitingFo...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2290
  
+1 Nice finding.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2288: [STORM-2700] Should not check Blobstore ACL if the valida...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2288
  
First of all, the change looks good.

Btw, the default value of `storm.blobstore.acl.validation.enabled` is 
`false` so it will affect all of the existing cluster. Moreover I don't think 
it is good default value for secure cluster. Maybe we should explore to change 
the default value to `true` and see it also works with insecure cluster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2199: [STORM-2201] Add dynamic scheduler configuration loading

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2199
  
I rebased the branch and addressed some of your comments. I am still 
working on the rest of them


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2199: [STORM-2201] Add dynamic scheduler configuration l...

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2199#discussion_r134612074
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
 ---
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Time;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+/**
+ * A dynamic loader for that can load scheduler configurations for user 
resource guarantees from Artifactory.
+ *
+ * Configuration items for this config loader are passed in via config 
settings in
+ * each scheduler that has a configurable loader.
+ *
+ * For example, the resource aware scheduler has configuration items 
defined in DaemonConfig.java
+ * that allow a user to configure which implementation of IConfigLoader to 
use to load
+ * specific scheduler configs as well as any parameters to pass into the 
prepare method of
+ * that configuration.
+ *
+ * resource.aware.scheduler.user.pools.loader can be set to 
org.apache.storm.scheduler.utils.ArtifactoryConfigLoader
+ *
+ * and then resource.aware.scheduler.user.pools.loader.params can be 
set to any of the following
+ *
+ * 
+ * {"artifactory.config.loader.uri": 
"http://artifactory.example.org:9989/artifactory/confs/my_cluster/mt_user_pool"}
+ *
+ * {"artifactory.config.loader.uri": 
"file:///confs/my_cluster/mt_user_pool"}
+ *
+ * {"artifactory.config.loader.uri": 
"file:///confs/my_cluster/mt_user_pool", 
"artifactory.config.loader.timeout.secs" : "60"}
+ * 
+ *
+ */
+public class ArtifactoryConfigLoader implements IConfigLoader {
+protected static final String ARTIFACTORY_URI = 
"artifactory.config.loader.uri";
+protected static final String ARTIFACTORY_TIMEOUT_SECS = 
"artifactory.config.loader.timeout.secs";
+protected static final String ARTIFACTORY_POLL_TIME_SECS = 
"artifactory.config.loader.polltime.secs";
+protected static final String ARTIFACTORY_SCHEME = 
"artifactory.config.loader.scheme";
+protected static final String ARTIFACTORY_BASE_DIRECTORY = 
"artifactory.config.loader.base.directory";
+protected static final String LOCAL_ARTIFACT_DIR = 
"scheduler_artifacts";
+static final String cacheFilename = "latest.yaml";
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ArtifactoryConfigLoader.class);
+
+@SuppressWarnings("rawtypes")
+private Map _conf;
+private int _artifactoryPollTimeSecs = 600;
+private boolean _cacheInitialized = false;
+// Location of the file in the artifactory archive.  Also used to name 
file in cache.
+private String _localCacheDir;
+private String _artifactoryScheme = "http";
+private String _baseDirectory = "/artifactory";
+private int _lastReturnedTime = 0;
+private int _timeoutSeconds = 10;
+private Map _lastReturnedValue;
+
+/**
+ * A private class used t

[GitHub] storm pull request #2199: [STORM-2201] Add dynamic scheduler configuration l...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2199#discussion_r134611776
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
 ---
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Time;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+/**
+ * A dynamic loader for that can load scheduler configurations for user 
resource guarantees from Artifactory.
+ *
+ * Configuration items for this config loader are passed in via config 
settings in
+ * each scheduler that has a configurable loader.
+ *
+ * For example, the resource aware scheduler has configuration items 
defined in DaemonConfig.java
+ * that allow a user to configure which implementation of IConfigLoader to 
use to load
+ * specific scheduler configs as well as any parameters to pass into the 
prepare method of
+ * that configuration.
+ *
+ * resource.aware.scheduler.user.pools.loader can be set to 
org.apache.storm.scheduler.utils.ArtifactoryConfigLoader
+ *
+ * and then resource.aware.scheduler.user.pools.loader.params can be 
set to any of the following
+ *
+ * 
+ * {"artifactory.config.loader.uri": 
"http://artifactory.example.org:9989/artifactory/confs/my_cluster/mt_user_pool"}
+ *
+ * {"artifactory.config.loader.uri": 
"file:///confs/my_cluster/mt_user_pool"}
+ *
+ * {"artifactory.config.loader.uri": 
"file:///confs/my_cluster/mt_user_pool", 
"artifactory.config.loader.timeout.secs" : "60"}
+ * 
+ *
+ */
+public class ArtifactoryConfigLoader implements IConfigLoader {
+protected static final String ARTIFACTORY_URI = 
"artifactory.config.loader.uri";
+protected static final String ARTIFACTORY_TIMEOUT_SECS = 
"artifactory.config.loader.timeout.secs";
+protected static final String ARTIFACTORY_POLL_TIME_SECS = 
"artifactory.config.loader.polltime.secs";
+protected static final String ARTIFACTORY_SCHEME = 
"artifactory.config.loader.scheme";
+protected static final String ARTIFACTORY_BASE_DIRECTORY = 
"artifactory.config.loader.base.directory";
+protected static final String LOCAL_ARTIFACT_DIR = 
"scheduler_artifacts";
+static final String cacheFilename = "latest.yaml";
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ArtifactoryConfigLoader.class);
+
+@SuppressWarnings("rawtypes")
+private Map _conf;
+private int _artifactoryPollTimeSecs = 600;
+private boolean _cacheInitialized = false;
+// Location of the file in the artifactory archive.  Also used to name 
file in cache.
+private String _localCacheDir;
+private String _artifactoryScheme = "http";
+private String _baseDirectory = "/artifactory";
+private int _lastReturnedTime = 0;
+private int _timeoutSeconds = 10;
+private Map _lastReturnedValue;
+
+/**
+ * A private class us

[GitHub] storm pull request #2199: [STORM-2201] Add dynamic scheduler configuration l...

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2199#discussion_r134608551
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
 ---
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Time;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+/**
+ * A dynamic loader for that can load scheduler configurations for user 
resource guarantees from Artifactory.
+ *
+ * Configuration items for this config loader are passed in via config 
settings in
+ * each scheduler that has a configurable loader.
+ *
+ * For example, the resource aware scheduler has configuration items 
defined in DaemonConfig.java
+ * that allow a user to configure which implementation of IConfigLoader to 
use to load
+ * specific scheduler configs as well as any parameters to pass into the 
prepare method of
+ * that configuration.
+ *
+ * resource.aware.scheduler.user.pools.loader can be set to 
org.apache.storm.scheduler.utils.ArtifactoryConfigLoader
+ *
+ * and then resource.aware.scheduler.user.pools.loader.params can be 
set to any of the following
+ *
+ * 
+ * {"artifactory.config.loader.uri": 
"http://artifactory.example.org:9989/artifactory/confs/my_cluster/mt_user_pool"}
+ *
+ * {"artifactory.config.loader.uri": 
"file:///confs/my_cluster/mt_user_pool"}
+ *
+ * {"artifactory.config.loader.uri": 
"file:///confs/my_cluster/mt_user_pool", 
"artifactory.config.loader.timeout.secs" : "60"}
+ * 
+ *
+ */
+public class ArtifactoryConfigLoader implements IConfigLoader {
+protected static final String ARTIFACTORY_URI = 
"artifactory.config.loader.uri";
+protected static final String ARTIFACTORY_TIMEOUT_SECS = 
"artifactory.config.loader.timeout.secs";
+protected static final String ARTIFACTORY_POLL_TIME_SECS = 
"artifactory.config.loader.polltime.secs";
+protected static final String ARTIFACTORY_SCHEME = 
"artifactory.config.loader.scheme";
+protected static final String ARTIFACTORY_BASE_DIRECTORY = 
"artifactory.config.loader.base.directory";
+protected static final String LOCAL_ARTIFACT_DIR = 
"scheduler_artifacts";
+static final String cacheFilename = "latest.yaml";
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ArtifactoryConfigLoader.class);
+
+@SuppressWarnings("rawtypes")
+private Map _conf;
+private int _artifactoryPollTimeSecs = 600;
+private boolean _cacheInitialized = false;
+// Location of the file in the artifactory archive.  Also used to name 
file in cache.
+private String _localCacheDir;
+private String _artifactoryScheme = "http";
+private String _baseDirectory = "/artifactory";
+private int _lastReturnedTime = 0;
+private int _timeoutSeconds = 10;
+private Map _lastReturnedValue;
+
+/**
+ * A private class used t

[GitHub] storm pull request #2199: [STORM-2201] Add dynamic scheduler configuration l...

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2199#discussion_r134607693
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
 ---
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.apache.storm.Config;
+import org.apache.storm.utils.Time;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+/**
+ * A dynamic loader for that can load scheduler configurations for user 
resource guarantees from Artifactory.
+ *
+ * Configuration items for this config loader are passed in via config 
settings in
+ * each scheduler that has a configurable loader.
+ *
+ * For example, the resource aware scheduler has configuration items 
defined in DaemonConfig.java
+ * that allow a user to configure which implementation of IConfigLoader to 
use to load
+ * specific scheduler configs as well as any parameters to pass into the 
prepare method of
+ * that configuration.
+ *
+ * resource.aware.scheduler.user.pools.loader can be set to 
org.apache.storm.scheduler.utils.ArtifactoryConfigLoader
+ *
+ * and then resource.aware.scheduler.user.pools.loader.params can be 
set to any of the following
+ *
+ * 
+ * {"artifactory.config.loader.uri": 
"http://artifactory.example.org:9989/artifactory/confs/my_cluster/mt_user_pool"}
+ *
+ * {"artifactory.config.loader.uri": 
"file:///confs/my_cluster/mt_user_pool"}
+ *
+ * {"artifactory.config.loader.uri": 
"file:///confs/my_cluster/mt_user_pool", 
"artifactory.config.loader.timeout.secs" : "60"}
+ * 
+ *
+ */
+public class ArtifactoryConfigLoader implements IConfigLoader {
+protected static final String ARTIFACTORY_URI = 
"artifactory.config.loader.uri";
+protected static final String ARTIFACTORY_TIMEOUT_SECS = 
"artifactory.config.loader.timeout.secs";
+protected static final String ARTIFACTORY_POLL_TIME_SECS = 
"artifactory.config.loader.polltime.secs";
+protected static final String ARTIFACTORY_SCHEME = 
"artifactory.config.loader.scheme";
+protected static final String ARTIFACTORY_BASE_DIRECTORY = 
"artifactory.config.loader.base.directory";
+protected static final String LOCAL_ARTIFACT_DIR = 
"scheduler_artifacts";
+static final String cacheFilename = "latest.yaml";
+
+private static final Logger LOG = 
LoggerFactory.getLogger(ArtifactoryConfigLoader.class);
+
+@SuppressWarnings("rawtypes")
+private Map _conf;
+private int _artifactoryPollTimeSecs = 600;
+private boolean _cacheInitialized = false;
+// Location of the file in the artifactory archive.  Also used to name 
file in cache.
+private String _localCacheDir;
+private String _artifactoryScheme = "http";
+private String _baseDirectory = "/artifactory";
+private int _lastReturnedTime = 0;
+private int _timeoutSeconds = 10;
+private Map _lastReturnedValue;
+
+/**
+ * A private class used t

[GitHub] storm issue #2270: [STORM-2686] Add Locality Aware Shuffle Grouping

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2270
  
@HeartSaVioR  Thanks for the review. Yes, you are right. In this case 
LocalOrShuffle and Shuffle are the same. I just wanted to be consistent with 
above experiments. 
Comparing the results of ours, it looks like the `new LocalOrShuffle` will 
get similar performance with`LocalityASG` on single worker mode. I personally 
think the numbers here should be good for now (especially those cases with more 
network latency) and if it gets merged in, we can start working on some 
follow-up improvements.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2291: [STORM-2704] Check blob permission before submitting the ...

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2291
  
Tested it manually. All the experiments submitted topologies as `ethan`.

1. With `storm.blobstore.acl.validation.enabled: false`
1.1. When the key does not exist: `InvalidTopologyException` 

![image](https://user-images.githubusercontent.com/14900612/29582138-2ec1c412-8741-11e7-9e0f-b080acb908ff.png)
1.2. When the key exists with ACL set to `u:mapredqa:rwa`: submitted 
successfully.

![image](https://user-images.githubusercontent.com/14900612/29582141-31f2dbee-8741-11e7-9aa5-faa688132b98.png)

2. With `storm.blobstore.acl.validation.enabled: true`
2.1 When the key does not exist: `InvalidTopologyException` 

![image](https://user-images.githubusercontent.com/14900612/29582188-62d0b7ae-8741-11e7-9621-a52bed416a5e.png)
2.2 When the key exists with ACL set to `u:mapredqa:rwa`: 
`AuthorizationException`

![image](https://user-images.githubusercontent.com/14900612/29582211-76295108-8741-11e7-8aa4-b1c2dc8294d1.png)
2.3 When the key exists with ACL set to `u:ethan:rwa`: submitted 
successfully.

![image](https://user-images.githubusercontent.com/14900612/29582236-9607c590-8741-11e7-9f24-36ea0a590c7b.png)











---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-22 Thread kamleshbhatt
Github user kamleshbhatt commented on the issue:

https://github.com/apache/storm/pull/2233
  
Unable to squash the commits, tried multiple times.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2291: [STORM-2704] Check blob permission before submitti...

2017-08-22 Thread Ethanlm
GitHub user Ethanlm opened a pull request:

https://github.com/apache/storm/pull/2291

[STORM-2704] Check blob permission before submitting the topology

See: https://issues.apache.org/jira/browse/STORM-2704

The original code only checks if the keys in `topology.blobstore.map` exist 
in the BlobStore.  We need to check the ACL as well.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ethanlm/storm STORM-2704

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2291.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2291


commit 4387b0f10958d72ceedc88da12aeb546cca1b434
Author: Ethan Li 
Date:   2017-08-22T18:44:33Z

[STORM-2704] Check blob permission before submitting the topology




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2287: [STORM-2699] Put all the version information of th...

2017-08-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2287


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2290: [STORM-2703] Handle ExecutionException in handleWaitingFo...

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2290
  
Tested it manually.
 
1. Create a blob `key1` with ACL set to `u:mapredqa:rwa`.
2. Submit a topology as `ethan` with `-c 
topology.blobstore.map='{"key1":{"localname":"test-blob.txt", 
"uncompress":false}}'`.
3. Check the supervisor.log: `[ethan] does not have access no key1`. And 
the slot changed to `EMPTY` state. The supervisor didn't crash. 

![image](https://user-images.githubusercontent.com/14900612/29580995-7f3d8470-873d-11e7-9699-342c7cdca4bb.png)
4. Later, set the blob `key1` ACL with command `set-acl -s u:ethan:rwa key1`
5. Check the supervisor.log. The workers started running. Also checked with 
worker.log which showed normal outputs.

![image](https://user-images.githubusercontent.com/14900612/29581074-bbbe9254-873d-11e7-9d70-f2dec26487b9.png)





---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2290: [STORM-2703] Handle ExecutionException in handleWa...

2017-08-22 Thread Ethanlm
GitHub user Ethanlm opened a pull request:

https://github.com/apache/storm/pull/2290

[STORM-2703] Handle ExecutionException in handleWaitingForBlobLocalization 
state

See: https://issues.apache.org/jira/browse/STORM-2703

Keep supervisor from crashing when exception happens because of blob 
permission exception (or KeyNotFound exception). The code change here will 
bring state machine back to `MachineState.EMPTY` when that happens and start 
over.

So, if the blob ACL permission is fixed (or the key is uploaded) during the 
runtime, the topology will start to work; if the topology is killed, the Slot 
is able to continue to deal with other assignment.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ethanlm/storm STORM-2703

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2290.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2290


commit 3213ba40b9479337e8f9dc1c412f57e6bb57aace
Author: Ethan Li 
Date:   2017-08-22T18:15:30Z

[STORM-2703] Handle ExecutionException in handleWaitingForBlobLocalization 
state




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2257: STORM-2673: add support for prioritizing nodes in ...

2017-08-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2257


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2288: [STORM-2700] Should not check Blobstore ACL if the valida...

2017-08-22 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2288
  
This error should be unrelated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-22 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/storm/pull/2233
  
Also please squash the commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r134441630
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends BaseProcessor> implements BatchProcessor {
+private final String firstStream;
+private final String secondStream;
+private final Multimap firstMap = ArrayListMultimap.create();
+private final Multimap secondMap = ArrayListMultimap.create();
+
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream) {
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+firstMap.put(key, val);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+secondMap.put(key, val);
+}
+if (!context.isWindowed()) {
+forwardValues();
+}
+
+}
+
+@Override
+public void finish() {
+forwardValues();
+firstMap.clear();
+secondMap.clear();
+}
+
+private void forwardValues() {
+Multimap immutableFirstMap = 
ImmutableMultimap.copyOf(firstMap);
--- End diff --

You can just take a copy of the values than copying the entire map.

`context.forward(Pair.of(key, Pair.of(new ArrayList<>(values), 
secondMap.removeAll(key;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2233: Storm 2258: Streams api - support CoGroupByKey

2017-08-22 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2233#discussion_r134441647
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/streams/processors/CoGroupByKeyProcessor.java
 ---
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.streams.processors;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.Pair;
+
+/**
+ * co-group by key implementation
+ */
+public class CoGroupByKeyProcessor extends BaseProcessor> implements BatchProcessor {
+private final String firstStream;
+private final String secondStream;
+private final Multimap firstMap = ArrayListMultimap.create();
+private final Multimap secondMap = ArrayListMultimap.create();
+
+
+public CoGroupByKeyProcessor(String firstStream, String secondStream) {
+this.firstStream = firstStream;
+this.secondStream = secondStream;
+}
+
+@Override
+public void execute(Pair input, String sourceStream) {
+K key = input.getFirst();
+if (sourceStream.equals(firstStream)) {
+V1 val = (V1) input.getSecond();
+firstMap.put(key, val);
+} else if (sourceStream.equals(secondStream)) {
+V2 val = (V2) input.getSecond();
+secondMap.put(key, val);
+}
+if (!context.isWindowed()) {
+forwardValues();
+}
+
+}
+
+@Override
+public void finish() {
+forwardValues();
+firstMap.clear();
+secondMap.clear();
+}
+
+private void forwardValues() {
+Multimap immutableFirstMap = 
ImmutableMultimap.copyOf(firstMap);
+immutableFirstMap.asMap().forEach((key, values) -> {
+context.forward(Pair.of(key, Pair.of(values, 
secondMap.removeAll(key;
+});
+
+Multimap immutableSecondMap = 
ImmutableMultimap.copyOf(secondMap);
--- End diff --

You can just take a copy of the values than copying the entire map.

`context.forward(Pair.of(key, Pair.of(firstMap.removeAll(key), new 
ArrayList<>(values;`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2261
  
@revans2 @roshannaik 
I know you're busy, but could you have time to take a look at the change? I 
guess it is clear improvement and I provide raw numbers to see the difference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2261
  
Raw numbers are here: 
https://gist.github.com/HeartSaVioR/5e80ab3a58b3e8cf40bab9c6da482639


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2261
  
I got back to 8f63d5a which doesn't touch any interfaces and do same tests:

Grouping | transferred (messages) | transfer rate (message/s) | 
spout_transferred | spout_acks | spout_throughput (acks/s)
-- | -- | -- | -- | -- | --
New LocalOrShuffle (patch) | 160441600 | 2674026 | 160441600 | 160441580 | 
2674026

Now it is a bit slower than ShuffleGrouping but still faster than 
LoadAwareShuffleGrouping (about 22%).

So we can choose either better improvement with touching multiple parts or 
still great improvement without touching other parts.

I have tested another thing, replacing List with Array in ShuffleGrouping. 
Test result is below:

Grouping | transferred (messages) | transfer rate (message/s) | 
spout_transferred | spout_acks | spout_throughput (acks/s)
-- | -- | -- | -- | -- | --
LocalOrShuffle with loadaware disabled (master) | 161437800 | 2690630 | 
161437800 | 161437760 | 2690630

It doesn't seem to bring noticeable improvement.

The difference may be the length of the array: the array is too small 
(would have 1 element) in test and had to call another `set()` in addition to 
`incrementAndGet()` for every time. Please note that the length of array in the 
patch is 1000, so `set()` is called every 1000 times.

We could grow the array in `prepare()` to get better performance, but 
that's going to be a micro-optimization and I'm not clear we would need to 
apply.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2270: [STORM-2686] Add Locality Aware Shuffle Grouping

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2270
  
@Ethanlm 
Thanks for doing following up test. Looks good to me. 

Looks like we can put more effort for further optimizations, but I think it 
could be handled as follow up issue, especially if you are working on follow-up 
tasks. We're dealing with network latency which is much bigger than what 
optimization can bring to.

Btw, when you are doing the test with 1 worker, you don't need to test both 
LocalOrShuffle and Shuffle, since LocalOrShuffle is just a shuffle with 
limiting tasks to local if possible. If you still would want to play with 
performance test, you may also want to play with #2261 given that it will just 
replace the number for LoadAwareShuffleGrouping.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #2261: STORM-2678 Improve performance of LoadAwareShuffleGroupin...

2017-08-22 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/2261
  
Now I have another numbers to persuade this patch.

I just take same approach to what @Ethanlm is done with his patch #2270 
Performance testing on ConstSpoutNullBoltTopo with ACKing disabled.

1. Config: topology.message.timeout: 300; topology.max.spout.pending: 5000; 
topology.acker.executors: 0
1. 1 VM from AWS c4.xlarge, `dedicated` to get more accurate result
1. Launched 1 workers, 1 spout task and 1 bolt task. ACKing disabled.
1. All experiments ran 300s.
1. For clarity, only show the outputs at 240s.
1. tested 3 times for each case, and pick one which showed median result
1. Numbers fluctuate slightly during the experiments.

Used 08038b6 (last commit) for this patch, and 77354fe for baseline 
(master).

Grouping | transferred (messages) | transfer rate (message/s) | 
spout_transferred | spout_acks | spout_throughput (acks/s)
-- | -- | -- | -- | -- | --
New LocalOrShuffle (patch) | 167984520 | 2799742 | 167984520 | 167984520 | 
2799742
LocalOrShuffle (master) | 130891240 | 2181520 | 130891240 | 130891260 | 
2181520
LocalOrShuffle with loadaware disabled (master) | 161410760 | 2690179 | 
161410760 | 161410740 | 2690179

So the new LoadAwareShuffleGrouping is definitely faster than current 
LoadAwareShuffleGrouping (about 28%), and *even faster* than current 
ShuffleGrouping (about 4%).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---