[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS

2018-12-11 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2925#discussion_r240657555
  
--- Diff: storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java ---
@@ -288,13 +288,15 @@ public void createBlob(String key, InputStream in, 
SettableBlobMeta meta, Subjec
 while ((len = in.read(buffer)) > 0) {
 out.write(buffer, 0, len);
 }
-out.close();
-} catch (AuthorizationException | IOException | RuntimeException 
e) {
--- End diff --

sorry I was wrong 


---


[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS

2018-12-11 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2925#discussion_r240642462
  
--- Diff: storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java ---
@@ -288,13 +288,15 @@ public void createBlob(String key, InputStream in, 
SettableBlobMeta meta, Subjec
 while ((len = in.read(buffer)) > 0) {
 out.write(buffer, 0, len);
 }
-out.close();
-} catch (AuthorizationException | IOException | RuntimeException 
e) {
--- End diff --

It looks to me that the exception is still caught instead of thrown?


---


[GitHub] storm pull request #2902: [STORM-3282] Fix RAS worker count estimation

2018-11-07 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2902#discussion_r231581587
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java ---
@@ -691,8 +691,10 @@ public static boolean isRAS(Map conf) {
 
 public static int getEstimatedWorkerCountForRASTopo(Map topoConf, StormTopology topology)
 throws InvalidTopologyException {
-return (int) 
Math.ceil(getEstimatedTotalHeapMemoryRequiredByTopo(topoConf, topology) /
-   
ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB)));
+Double defaultWorkerMaxHeap = 
ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB));
+Double topologyWorkerMaxHeap = 
ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
--- End diff --

Oh. ok. If there is a default value in yaml file, then that's fine. I think 
we don't want to have to maintain two default values both in default.yaml and 
also in the code.


---


[GitHub] storm pull request #2902: [STORM-3282] Fix RAS worker count estimation

2018-11-07 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2902#discussion_r231542667
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java ---
@@ -691,8 +691,10 @@ public static boolean isRAS(Map conf) {
 
 public static int getEstimatedWorkerCountForRASTopo(Map topoConf, StormTopology topology)
 throws InvalidTopologyException {
-return (int) 
Math.ceil(getEstimatedTotalHeapMemoryRequiredByTopo(topoConf, topology) /
-   
ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB)));
+Double defaultWorkerMaxHeap = 
ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB));
+Double topologyWorkerMaxHeap = 
ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
--- End diff --

Will `topoConf.get(Config.WORKER_HEAP_MEMORY_MB)` be null? If so, this will 
throw exception `IllegalArgumentException("Don't know how to convert null to 
double");`


---


[GitHub] storm pull request #2901: [STORM-3271] Docker support: launch storm workers ...

2018-11-06 Thread Ethanlm
GitHub user Ethanlm opened a pull request:

https://github.com/apache/storm/pull/2901

[STORM-3271] Docker support:  launch storm workers in docker containers

Spent a lot of effort on this.

This patch let storm supervisor launches the worker in a docker container 
so that user code is isolated from each other and from the host machine. This 
is a security enhancement.

Detailed explanation of this are available at docs/Docker-support.md in 
this pr.

I tested it manually (only on `RHEL7`, didn't test it on other os systems) 
and tested with secure cluster setup. Also tested with our own integration test 
suites with docker support turned off and proved that this pr won't break storm.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ethanlm/storm STORM-3271

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2901.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2901


commit ba8b7cf80347fe077e097206023adb3326c8b4ad
Author: Ethan Li 
Date:   2018-10-12T23:24:15Z

refactor supervisor source code

commit 2387da9ae554ed81573e35545992754bc1454c9d
Author: Ethan Li 
Date:   2018-10-12T23:24:23Z

add docker support

commit b130bcb649f6adcb1fef588dc7033361999a9d0f
Author: Ethan Li 
Date:   2018-10-15T15:27:39Z

add cgrouproot and nscd bind mounts

add user's groups and simply getGroupIdInfo and getUserIdInfo functions

commit be2efc833144004c126ff150f9676adc15575549
Author: Ethan Li 
Date:   2018-10-16T13:48:16Z

address some review comments

commit 80f3296688fce2bec9692a2699ae396a32c8e02e
Author: Ethan Li 
Date:   2018-10-16T22:04:15Z

fix worker-launcher for secure storm cluster

commit d4e2127ef9664c0398f2e85c26d0dde5734d3444
Author: Ethan Li 
Date:   2018-10-17T14:35:19Z

launch docker container in attached mode

commit 8e4c3c7a3cfe2a38dc847ea2ebda2ce74f9a45ff
Author: Ethan Li 
Date:   2018-10-17T16:05:18Z

check prerequisites in prepare method; bind mount workerArtifact more 
precisely

commit e0131136fd864aa5bc4a3b8e40b538d04a1c36ae
Author: Ethan Li 
Date:   2018-10-19T19:57:24Z

fix user could be null (e.g when supervisor restarts)

commit 5dfcd9ff8869f6a4334ccecdefb0f5c1f40ea9fb
Author: Ethan Li 
Date:   2018-10-22T20:24:20Z

fix typo; make network type non-configurable

commit fa5e2999312258b4da0b1936591804780bd8ec8e
Author: Ethan Li 
Date:   2018-10-23T19:55:45Z

make worker-launcher more secure

commit 0f08bcde454f7a2e613eb673ba697236d8122999
Author: Ethan Li 
Date:   2018-10-24T16:28:24Z

make nsenter more secure

commit 721c6b85e38b87761f0a52c19cd9b684bf70cbba
Author: Ethan Li 
Date:   2018-10-24T20:10:56Z

add more comments; add list of readonly bindmounts

commit be88fb7c174ab15e22ab99590530919499fe9d38
Author: Ethan Li 
Date:   2018-10-24T20:53:10Z

making nsenter safer

commit 1d84bf7d86cfa7e3c603ef4f99e0e6c4e2ec9ede
Author: Ethan Li 
Date:   2018-10-25T14:23:18Z

add a whitelist for docker images

commit 9c8ae38bdbc31d897eca9f1779d96d5b995fcec6
Author: Ethan Li 
Date:   2018-10-25T21:20:39Z

check every image in the whitelist; prevent memory leark from 
worker-launcher; fix format and trivial issues.

commit 52350f4363821ac14a5b7b2114bc1bddb204299a
Author: Ethan Li 
Date:   2018-10-26T20:18:25Z

launch docker container in background; use docker-wait to get the exitCode 
if the contaner exits

commit 6c80da77481cccabd047452b055ab3cd68a67c40
Author: Ethan Li 
Date:   2018-10-30T20:46:26Z

add a cgroup sub path template to support various cgroup setup

commit 8b9c18f314c1f8bf12140631f7de052bfc35cff5
Author: Ethan Li 
Date:   2018-10-31T20:19:13Z

add unit tests for docker commands

commit 05b31e875536f01b3fb61a378ce9de3c01d4a5d3
Author: Ethan Li 
Date:   2018-11-01T13:51:00Z

use nsenter to do profiling; mount shared_by_topology/tmp to /tmp

commit a1ffe50265f446b68dac0350c8cb4c49bf05992b
Author: Ethan Li 
Date:   2018-11-01T15:02:12Z

fix loging for storm.resource.isolation.plugin

commit c4647183092541dc5957441500f62719510689f5
Author: Ethan Li 
Date:   2018-11-01T17:38:54Z

fix issues when DockerManager is not used

commit 190f06938a38f4225d94f86c2f1afbba068d1b22
Author: Ethan Li 
Date:   2018-11-05T16:54:32Z

add documentation




---


[GitHub] storm issue #2899: [STORM-3265] flight.bash fall back to use java utils dire...

2018-11-02 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2899
  
Last commit prepended "[STORM-3265]" to the commit message


---


[GitHub] storm pull request #2900: [STORM-3264] fix "local variable referenced before...

2018-11-01 Thread Ethanlm
GitHub user Ethanlm opened a pull request:

https://github.com/apache/storm/pull/2900

[STORM-3264] fix "local variable referenced before assignment" in storm.py

minor fix.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ethanlm/storm patch-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2900.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2900


commit d0dc3b4bd86657f2be13c1c0cdae2e5933da9eb1
Author: Meng Li (Ethan) 
Date:   2018-11-01T14:08:15Z

[STORM-3264] fix "local variable referenced before assignment" in storm.py




---


[GitHub] storm pull request #2899: flight.bash fall back to use java utils directly i...

2018-11-01 Thread Ethanlm
GitHub user Ethanlm opened a pull request:

https://github.com/apache/storm/pull/2899

flight.bash fall back to use java utils directly if $JAVA_HOME not set or 
java not avaialble in $BINPATH

https://issues.apache.org/jira/browse/STORM-3265

If JAVA_HOME is not set, or `/usr/bin/java` not available, we should just 
use java directly

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ethanlm/storm STORM-3265

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2899.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2899


commit 686ef4b4063381c57ce71f05296fcb1ffe0badf9
Author: Ethan Li 
Date:   2018-11-01T13:56:57Z

flight.bash fall back to use java utils directly if $JAVA_HOME not set or 
java not avaialble in $BINPATH




---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-31 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r229880617
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -414,6 +418,34 @@ public void validateInteger(String name, Object o) {
 }
 }
 
+public static class NumaEntryValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if (o == null) {
+return;
+}
+Map numa = (Map) o;
+for (String key : new String[]{NUMA_CORES, NUMA_MEMORY_IN_MB, 
NUMA_PORTS}) {
+if (!numa.containsKey(key)) {
+throw new IllegalArgumentException(
+"The numa configuration key [" + key + "] is 
missing!"
+);
+}
+}
+
+List cores = (List) numa.get(NUMA_CORES);
+Set coreSet = new HashSet();
+coreSet.addAll(cores);
+if (coreSet.size() != cores.size()) {
--- End diff --

Thanks for adding this. Do we care about duplicate cores across the numa 
zones?  (I don't  
for example.
```
numaid=0,  cores=[0,1,2,3]
numaid=1, cores=[0,1,2,3]
```


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-31 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r229879326
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
 ---
@@ -607,6 +608,27 @@ protected String javaCmd(String cmd) {
 return ret;
 }
 
+/**
+ * Extracting out to mock it for tests.
+ * @return true if on Linux.
+ */
+protected boolean isOnLinux() {
+return SystemUtils.IS_OS_LINUX;
+}
+
+private void prefixNumaPinningIfApplicable(String numaId, List 
commandList) {
+if (numaId != null) {
+if (isOnLinux()) {
+commandList.add("numactl");
+commandList.add("--i=" + numaId);
--- End diff --

sorry to keep beating you on this :D  How about:
```
commandList.add(0, "numactl");
commandList.add(1, "--i=" + numaId);
```
since we want to add prefix to `commandList` no matter it's empty or not


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-31 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r229876743
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java ---
@@ -414,6 +418,34 @@ public void validateInteger(String name, Object o) {
 }
 }
 
+public static class NumaEntryValidator extends Validator {
+
+@Override
+public void validateField(String name, Object o) {
+if (o == null) {
+return;
+}
+Map numa = (Map) o;
+for (String key : new String[]{NUMA_CORES, NUMA_MEMORY_IN_MB, 
NUMA_PORTS}) {
+if (!numa.containsKey(key)) {
+throw new IllegalArgumentException(
+"The numa configuration key [" + key + "] is 
missing!"
+);
+}
+}
+
+List cores = (List) numa.get(NUMA_CORES);
+Set coreSet = new HashSet();
+coreSet.addAll(cores);
+if (coreSet.size() != cores.size()) {
+throw new IllegalArgumentException(
+"No duplicate cores in NUMA config"
--- End diff --

Better to be "duplicate cores in NUMA config"?


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-31 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r229878430
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
 ---
@@ -607,6 +608,27 @@ protected String javaCmd(String cmd) {
 return ret;
 }
 
+/**
+ * Extracting out to mock it for tests.
+ * @return true if on Linux.
+ */
+protected boolean isOnLinux() {
+return SystemUtils.IS_OS_LINUX;
+}
+
+private void prefixNumaPinningIfApplicable(String numaId, List 
commandList) {
+if (numaId != null) {
+if (isOnLinux()) {
+commandList.add("numactl");
+commandList.add("--i=" + numaId);
+return;
+} else {
+// TODO : Add support for pinning on Windows host
+throw new RuntimeException("numactl pinning currently not 
supported on non-Linux hosts");
--- End diff --

Maybe `UnsupportedOperationException` is a better option. But I am fine 
with this.


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-31 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r229875813
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -1041,6 +1041,24 @@
 @isPositiveNumber
 @NotNull
 public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = 
"supervisor.worker.timeout.secs";
+
+/**
+ * A map with keys mapped to each NUMA Node on the supervisor that 
will be used
+ * by scheduler. CPUs, memory and ports available on each NUMA node 
will be provided.
+ * Each supervisor will have different map of NUMAs.
+ * Example: "supervisor.numa.meta": {
+ *  "0": { "memory.mb": 122880, "cores": [ 0, 12, 1, 13, 2, 14, 3, 15, 
4, 16, 5, 17],
+ *  "ports": [6700, 6701]},
+ *  "1" : {"memory.mb": 122880, "cores": [ 6, 18, 7, 19, 8, 20, 9, 21, 
10, 22, 11, 23],
--- End diff --

"ports" is numa.ports" now since ` "NUMA_PORTS = "numa.ports"`


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228300609
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -121,11 +121,68 @@
 private static String memoizedLocalHostnameString = null;
 public static final Pattern TOPOLOGY_KEY_PATTERN = 
Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
 
+public static final String NUMA_MEMORY_IN_MB = "MemoryInMB";
+public static final String NUMA_CORES = "Cores";
+public static final String NUMAS_PORTS = "Ports";
+public static final String NUMA_ID = "Id";
+public static final String NUMAS_BASE = "Numas";
+
 static {
 localConf = readStormConfig();
 serializationDelegate = getSerializationDelegate(localConf);
 }
 
+/**
+ * Validate supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getValidatedNumaMap
+ * @throws KeyNotFoundException
+ */
+public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException {
+Map validatedNumaMap = new HashMap();
+Map supervisorNumaMap = (Map) 
stormConf.get(Config.SUPERVISOR_NUMA_META);
+if (supervisorNumaMap == null) return validatedNumaMap;
+if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
+throw new KeyNotFoundException("The numa configurations [" + 
NUMAS_BASE + "] is missing!");
+}
+List numaEntries = (List) 
supervisorNumaMap.get(NUMAS_BASE);
+if (numaEntries == null) return validatedNumaMap;
+for (Map numa : numaEntries) {
+for (String key : new String[]{NUMA_ID, NUMA_CORES, 
NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
+if (!numa.containsKey(key)) {
+throw new KeyNotFoundException("The numa configuration 
key [" + key + "] is missing!");
+}
+}
+validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
+}
+return validatedNumaMap;
+}
+
+/**
+ * getNumaIdForPort.
+ * @param port port
+ * @param stormConf stormConf
+ * @return getNumaIdForPort
+ * @throws KeyNotFoundException
+ */
+public static String getNumaIdForPort(Number port, Map 
stormConf) {
+Map validatedNumaMap = null;
+try {
+validatedNumaMap = getValidatedNumaMap(stormConf);
+} catch (KeyNotFoundException e) {
+LOG.error("Exception while getting NUMA config", e);
+return null;
--- End diff --

I agree that we should throw an exception if it's misconfigured. If numa is 
configured, I think we want to make sure it's working properly or fail fast if 
misconfigured. Storm admins might not notice there is an "ERROR" message at the 
first place and wondering why storm is  not working as expected (after a 
relatively long time). If they don't want numa, they will not set numa configs. 


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228303554
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
@@ -87,7 +87,7 @@ public Slot(AsyncLocalizer localizer, Map 
conf,
 AtomicReference> 
cachedCurrentAssignments,
 OnlyLatestExecutor metricsExec,
 WorkerMetricsProcessor metricsProcessor,
-SlotMetrics slotMetrics) throws Exception {
+SlotMetrics slotMetrics, String numaId) throws Exception {
--- End diff --

Can we remove this? It looks like it's not used anywhere


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228307991
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -119,15 +120,81 @@
 // tests by subclassing.
 private static Utils _instance = new Utils();
 private static String memoizedLocalHostnameString = null;
-public static final Pattern TOPOLOGY_KEY_PATTERN = 
Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
+public static final Pattern TOPOLOGY_KEY_PATTERN =
+Pattern.compile("^[\\w \\t\\._-]+$", 
Pattern.UNICODE_CHARACTER_CLASS);
+
+public static final String NUMA_MEMORY_IN_MB = "memory.mb";
+public static final String NUMA_CORES = "cores";
+public static final String NUMAS_PORTS = "ports";
+public static final String NUMA_ID = "node_id";
+public static final String NUMAS_BASE = "numas";
 
 static {
 localConf = readStormConfig();
 serializationDelegate = getSerializationDelegate(localConf);
 }
 
 /**
- * Provide an instance of this class for delegates to use.  To mock 
out delegated methods, provide an instance of a subclass that
+ * Validate supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getValidatedNumaMap
+ * @throws KeyNotFoundException
+ */
+public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException {
+Map validatedNumaMap = new HashMap();
+Map supervisorNumaMap =
+(Map) 
stormConf.get(Config.SUPERVISOR_NUMA_META);
+if (supervisorNumaMap == null) {
+return validatedNumaMap;
+}
+if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
+throw new KeyNotFoundException(
+"The numa configurations [" + NUMAS_BASE + "] is 
missing!"
+);
+}
+List numaEntries = (List) 
supervisorNumaMap.get(NUMAS_BASE);
+if (numaEntries == null) return validatedNumaMap;
+for (Map numa : numaEntries) {
+for (String key : new String[]{NUMA_ID, NUMA_CORES, 
NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
+if (!numa.containsKey(key)) {
--- End diff --

Since we calculate cpu capacity based on the size of `NUMA_CORES`, do we 
need to validate that there is no dups in `NUMA_CORES` ? For example, what will 
happen if `NUMA_CORES` is `[0, 1, 1, 3]`?

Another example: 
```
{"node_id": 0, "memory.mb": 122880, "cores": [ 0, 1], "ports": [6700, 
6701]},
{"node_id": 1, "memory.mb": 122880, "cores": [ 0, 1], "ports": [6702, 6703]}
```


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228298411
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -119,15 +120,81 @@
 // tests by subclassing.
 private static Utils _instance = new Utils();
 private static String memoizedLocalHostnameString = null;
-public static final Pattern TOPOLOGY_KEY_PATTERN = 
Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
+public static final Pattern TOPOLOGY_KEY_PATTERN =
+Pattern.compile("^[\\w \\t\\._-]+$", 
Pattern.UNICODE_CHARACTER_CLASS);
+
+public static final String NUMA_MEMORY_IN_MB = "memory.mb";
+public static final String NUMA_CORES = "cores";
+public static final String NUMAS_PORTS = "ports";
+public static final String NUMA_ID = "node_id";
+public static final String NUMAS_BASE = "numas";
 
 static {
 localConf = readStormConfig();
 serializationDelegate = getSerializationDelegate(localConf);
 }
 
 /**
- * Provide an instance of this class for delegates to use.  To mock 
out delegated methods, provide an instance of a subclass that
+ * Validate supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getValidatedNumaMap
+ * @throws KeyNotFoundException
+ */
+public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException {
+Map validatedNumaMap = new HashMap();
+Map supervisorNumaMap =
+(Map) 
stormConf.get(Config.SUPERVISOR_NUMA_META);
+if (supervisorNumaMap == null) {
+return validatedNumaMap;
+}
+if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
+throw new KeyNotFoundException(
--- End diff --

I think `KeyNotFoundException` is mostly about `blobstore`. And I don't 
think we should use it here. I think it's more like a `IllegalArgumentException`


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228321490
  
--- Diff: 
storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
 ---
@@ -319,7 +319,15 @@ public void testLaunch() throws Exception {
 superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
 superConf.put(DaemonConfig.STORM_LOG4J2_CONF_DIR, log4jdir);
 superConf.put(Config.WORKER_CHILDOPTS, " -Dtesting=true");
-
+Map numaNode = new HashMap();
+numaNode.put(Utils.NUMA_ID, "0");
+numaNode.put(Utils.NUMA_CORES, Collections.singletonList("0"));
+numaNode.put(Utils.NUMAS_PORTS, Collections.singletonList(8081));
+numaNode.put(Utils.NUMA_MEMORY_IN_MB, 2048);
+Map numaMap = new HashMap();
+numaMap.put(Utils.NUMAS_BASE, Collections.singletonList(numaNode));
+
+superConf.put(Config.SUPERVISOR_NUMA_META, numaMap);
--- End diff --

Remove this  since `testNumaPinnedLaunch()` is available  ?


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228319880
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
 ---
@@ -81,6 +81,20 @@ public void add(NormalizedResourcesWithMemory other) {
 totalMemoryMb += other.getTotalMemoryMb();
 }
 
+/**
+ * Remove the resources in other from this.
+ * @param other the resources to be removed.
+ * @return true if one or more resources in other were larger than 
available resources in this, else false.
+ */
+public boolean remove(NormalizedResourcesWithMemory other) {
--- End diff --

Maybe change `public boolean remove(NormalizedResourcesWithMemory other, 
ResourceMetrics resourceMetrics)` and then just call `remove(other, null)`


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228298658
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -119,15 +120,81 @@
 // tests by subclassing.
 private static Utils _instance = new Utils();
 private static String memoizedLocalHostnameString = null;
-public static final Pattern TOPOLOGY_KEY_PATTERN = 
Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
+public static final Pattern TOPOLOGY_KEY_PATTERN =
+Pattern.compile("^[\\w \\t\\._-]+$", 
Pattern.UNICODE_CHARACTER_CLASS);
+
+public static final String NUMA_MEMORY_IN_MB = "memory.mb";
+public static final String NUMA_CORES = "cores";
+public static final String NUMAS_PORTS = "ports";
+public static final String NUMA_ID = "node_id";
+public static final String NUMAS_BASE = "numas";
 
 static {
 localConf = readStormConfig();
 serializationDelegate = getSerializationDelegate(localConf);
 }
 
 /**
- * Provide an instance of this class for delegates to use.  To mock 
out delegated methods, provide an instance of a subclass that
+ * Validate supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getValidatedNumaMap
+ * @throws KeyNotFoundException
+ */
+public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException {
+Map validatedNumaMap = new HashMap();
+Map supervisorNumaMap =
+(Map) 
stormConf.get(Config.SUPERVISOR_NUMA_META);
+if (supervisorNumaMap == null) {
+return validatedNumaMap;
+}
+if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
+throw new KeyNotFoundException(
+"The numa configurations [" + NUMAS_BASE + "] is 
missing!"
+);
+}
+List numaEntries = (List) 
supervisorNumaMap.get(NUMAS_BASE);
+if (numaEntries == null) return validatedNumaMap;
+for (Map numa : numaEntries) {
+for (String key : new String[]{NUMA_ID, NUMA_CORES, 
NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
+if (!numa.containsKey(key)) {
+throw new KeyNotFoundException(
--- End diff --

Same here. I think `IllegalArgumentException` is better


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228301282
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -327,10 +396,12 @@ public static boolean isSystemId(String id) {
 }
 
 /**
- * Creates a thread that calls the given code repeatedly, sleeping for 
an interval of seconds equal to the return value of the previous
+ * Creates a thread that calls the given code repeatedly, sleeping for 
an interval of seconds
+ * equal to the return value of the previous
  * call.
--- End diff --

nit: put "call" in the same line with other words?


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228321712
  
--- Diff: 
storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
 ---
@@ -390,6 +398,116 @@ public void testLaunch() throws Exception {
"storm.log.dir", stormLogDir);
 }
 
+@Test
+public void testNumaPinnedLaunch() throws Exception {
+final String topoId = "test_topology_current";
--- End diff --

it doesn't matter but it seems to better have `topoId = 
"test_topology_numa_pinned";`


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228266730
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
 ---
@@ -12,16 +12,24 @@
 
 package org.apache.storm.daemon.supervisor.timer;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+
--- End diff --

```suggestion

```


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228264081
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java ---
@@ -77,6 +91,15 @@ public static BlobKeySequenceInfo 
normalizeNimbusHostPortSequenceNumberInfo(Stri
 }
 
 // Check for latest sequence number of a key inside zookeeper and 
return nimbodes containing the latest sequence number
--- End diff --

```suggestion

```


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228266466
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
 ---
@@ -17,14 +17,23 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
--- End diff --

```suggestion

```


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228227560
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -1041,6 +1041,19 @@
 @isPositiveNumber
 @NotNull
 public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = 
"supervisor.worker.timeout.secs";
+
+/**
+ * A map with blobstore keys mapped to each NUMA Node on the 
supervisor that will be used
--- End diff --

What's `blobstore keys` here


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228265639
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
 ---
@@ -607,6 +608,27 @@ protected String javaCmd(String cmd) {
 return ret;
 }
 
+/**
+ * Extracting out to mock it for tests.
+ * @return true if on Linux.
+ */
+protected boolean isOnLinux() {
+return SystemUtils.IS_OS_LINUX;
+}
+
+private void addNumaPinningIfApplicable(String numaId, List 
commandList) {
--- End diff --

It can be confusing since we are actually adding `numactl` as a prefix. 
This function looks like we are adding `numactl` to the end of the 
`commandList`. Although when we use it, `commandList` is empty.


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228264226
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java ---
@@ -101,7 +124,11 @@ public static BlobKeySequenceInfo 
normalizeNimbusHostPortSequenceNumberInfo(Stri
 return nimbusInfoSet;
 }
 
-// Get sequence number details from latest sequence number of the blob
+/**
+ * Get sequence number details from latest sequence number of the blob
--- End diff --

```suggestion
 * Get sequence number details from latest sequence number of the blob.
```


---


[GitHub] storm pull request #2744: [STORM-3132] Avoid NPE in the Values Constructor

2018-10-15 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2744#discussion_r225330762
  
--- Diff: storm-client/src/jvm/org/apache/storm/tuple/Values.java ---
@@ -23,9 +23,13 @@ public Values() {
 }
 
 public Values(Object... vals) {
-super(vals.length);
-for (Object o : vals) {
-add(o);
+super(vals != null ? vals.length : 0);
--- End diff --

I think this can be `super(vals != null ? vals.length : 1);` since `if 
(vals==null)` we are going to add `null` to it. The length will be 1.


---


[GitHub] storm issue #2844: STORM-3232: Display on the UI all versions of storm that ...

2018-09-25 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2844
  
Why is `Topologies matching 1.0 will run under 2.0-unittests`? Should it be 
`run under 2.0.0-SNAPSHOT` since the nimbus version is `2.0.0-SNAPSHOT`?


---


[GitHub] storm pull request #2843: STORM-3230: Add in sync if key not found

2018-09-19 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2843#discussion_r218905963
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java
 ---
@@ -95,9 +95,12 @@ private static IStormClusterState 
buildStateIfNeeded(Map conf, T
 throw new IllegalArgumentException("Token is not valid, token 
has expired.");
 }
 
-PrivateWorkerKey key = keyCache.getUnchecked(deser);
-if (key == null) {
-throw new IllegalArgumentException("Token is not valid, 
private key not found.");
+PrivateWorkerKey key;
+try {
+key = keyCache.getUnchecked(deser);
+} catch (CacheLoader.InvalidCacheLoadException e) {
+//This happens when the cache has a null returned to it.
--- End diff --

not sure about this. If `getUnchecked(deser);` returns null, it will throw 
an exception?


---


[GitHub] storm pull request #2823: [STORM-3206] validate topoConf only against Config...

2018-09-11 Thread Ethanlm
GitHub user Ethanlm opened a pull request:

https://github.com/apache/storm/pull/2823

[STORM-3206] validate topoConf only against Config.java during submission

Storm should only validate client-side conf when user submits a topology. 

Otherwise settings like `logviewer.https.port=-1` will fail the submission.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ethanlm/storm STORM-3206

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2823.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2823


commit e9af95e9b6a6ebc74fec833dc83c6bd18751313c
Author: Ethan Li 
Date:   2018-09-11T15:29:30Z

[STORM-3206] validate topoConf only against Config.java during submission




---


[GitHub] storm pull request #2820: STORM-3215: Add back in impersonation to UI

2018-09-06 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2820#discussion_r215690566
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java ---
@@ -22,74 +22,128 @@
 import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.security.auth.ThriftClient;
 import org.apache.storm.security.auth.ThriftConnectionType;
-import org.apache.storm.shade.com.google.common.collect.Lists;
-import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
 import org.apache.storm.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Client used for connecting to nimbus.  Typically you want to use a 
variant of the
+ * `getConfiguredClient` static method to get a client to use, as directly 
putting in
+ * a host and port does not support nimbus high availability.
+ */
 public class NimbusClient extends ThriftClient {
 private static final Logger LOG = 
LoggerFactory.getLogger(NimbusClient.class);
 private static volatile Nimbus.Iface _localOverrideClient = null;
 private static String oldLeader = "";
 /**
  * Indicates if this is a special client that is overwritten for local 
mode.
  */
-public final boolean _isLocal;
-private Nimbus.Iface _client;
+public final boolean isLocal;
+private final Nimbus.Iface client;
 
+/**
+ * Constructor, Please try to use `getConfiguredClient` instead of 
calling this directly.
+ * @param conf the conf for the client.
+ * @param host the host the client is to talk to.
+ * @param port the port the client is to talk to.
+ * @throws TTransportException on any error.
+ */
+@Deprecated
 public NimbusClient(Map conf, String host, int port) 
throws TTransportException {
 this(conf, host, port, null, null);
 }
 
+/**
+ * Constructor, Please try to use `getConfiguredClient` instead of 
calling this directly.
+ * @param conf the conf for the client.
+ * @param host the host the client is to talk to.
+ * @param port the port the client is to talk to.
+ * @param timeout the timeout to use when connecting.
+ * @throws TTransportException on any error.
+ */
 public NimbusClient(Map conf, String host, int port, 
Integer timeout) throws TTransportException {
 super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, 
null);
-_client = new Nimbus.Client(_protocol);
-_isLocal = false;
+client = new Nimbus.Client(_protocol);
+isLocal = false;
 }
 
+/**
+ * Constructor, Please try to use `getConfiguredClientAs` instead of 
calling this directly.
+ * @param conf the conf for the client.
+ * @param host the host the client is to talk to.
+ * @param port the port the client is to talk to.
+ * @param timeout the timeout to use when connecting.
+ * @param asUser the name of the user you want to impersonate (use 
with caution as it is not always supported).
+ * @throws TTransportException on any error.
+ */
 public NimbusClient(Map conf, String host, Integer 
port, Integer timeout, String asUser) throws TTransportException {
 super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, 
asUser);
-_client = new Nimbus.Client(_protocol);
-_isLocal = false;
+client = new Nimbus.Client(_protocol);
+isLocal = false;
 }
 
+/**
+ * Constructor, Please try to use `getConfiguredClient` instead of 
calling this directly.
+ * @param conf the conf for the client.
+ * @param host the host the client is to talk to.
+ * @throws TTransportException on any error.
+ */
 public NimbusClient(Map conf, String host) throws 
TTransportException {
 super(conf, ThriftConnectionType.NIMBUS, host, null, null, null);
-_client = new Nimbus.Client(_protocol);
-_isLocal = false;
+client = new Nimbus.Client(_protocol);
+isLocal = false;
 }
 
 private NimbusClient(Nimbus.Iface client) {
 super(new HashMap<>(), ThriftConnectionType.LOCAL_FAKE, 
"localhost", null, null, null);
-_client = client;
-_isLocal = true;
+this.client = client;
+isLocal = true;
 }
 
 /**
+ * Is the local override set or not.
  * @return true of new clients will be overridden to connect to a 
local cluster and not the configured remote cluster.
  */
 public static boolean isLocalOverride() {
 return _l

[GitHub] storm pull request #2820: STORM-3215: Add back in impersonation to UI

2018-09-06 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2820#discussion_r215690394
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java ---
@@ -22,74 +22,128 @@
 import org.apache.storm.security.auth.ReqContext;
 import org.apache.storm.security.auth.ThriftClient;
 import org.apache.storm.security.auth.ThriftConnectionType;
-import org.apache.storm.shade.com.google.common.collect.Lists;
-import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
 import org.apache.storm.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Client used for connecting to nimbus.  Typically you want to use a 
variant of the
+ * `getConfiguredClient` static method to get a client to use, as directly 
putting in
+ * a host and port does not support nimbus high availability.
+ */
 public class NimbusClient extends ThriftClient {
 private static final Logger LOG = 
LoggerFactory.getLogger(NimbusClient.class);
 private static volatile Nimbus.Iface _localOverrideClient = null;
 private static String oldLeader = "";
 /**
  * Indicates if this is a special client that is overwritten for local 
mode.
  */
-public final boolean _isLocal;
-private Nimbus.Iface _client;
+public final boolean isLocal;
+private final Nimbus.Iface client;
 
+/**
+ * Constructor, Please try to use `getConfiguredClient` instead of 
calling this directly.
+ * @param conf the conf for the client.
+ * @param host the host the client is to talk to.
+ * @param port the port the client is to talk to.
+ * @throws TTransportException on any error.
+ */
+@Deprecated
 public NimbusClient(Map conf, String host, int port) 
throws TTransportException {
 this(conf, host, port, null, null);
 }
 
+/**
+ * Constructor, Please try to use `getConfiguredClient` instead of 
calling this directly.
+ * @param conf the conf for the client.
+ * @param host the host the client is to talk to.
+ * @param port the port the client is to talk to.
+ * @param timeout the timeout to use when connecting.
+ * @throws TTransportException on any error.
+ */
 public NimbusClient(Map conf, String host, int port, 
Integer timeout) throws TTransportException {
 super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, 
null);
-_client = new Nimbus.Client(_protocol);
-_isLocal = false;
+client = new Nimbus.Client(_protocol);
+isLocal = false;
 }
 
+/**
+ * Constructor, Please try to use `getConfiguredClientAs` instead of 
calling this directly.
+ * @param conf the conf for the client.
+ * @param host the host the client is to talk to.
+ * @param port the port the client is to talk to.
+ * @param timeout the timeout to use when connecting.
+ * @param asUser the name of the user you want to impersonate (use 
with caution as it is not always supported).
+ * @throws TTransportException on any error.
+ */
 public NimbusClient(Map conf, String host, Integer 
port, Integer timeout, String asUser) throws TTransportException {
 super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, 
asUser);
-_client = new Nimbus.Client(_protocol);
-_isLocal = false;
+client = new Nimbus.Client(_protocol);
+isLocal = false;
 }
 
+/**
+ * Constructor, Please try to use `getConfiguredClient` instead of 
calling this directly.
+ * @param conf the conf for the client.
+ * @param host the host the client is to talk to.
+ * @throws TTransportException on any error.
+ */
 public NimbusClient(Map conf, String host) throws 
TTransportException {
 super(conf, ThriftConnectionType.NIMBUS, host, null, null, null);
-_client = new Nimbus.Client(_protocol);
-_isLocal = false;
+client = new Nimbus.Client(_protocol);
+isLocal = false;
 }
 
 private NimbusClient(Nimbus.Iface client) {
 super(new HashMap<>(), ThriftConnectionType.LOCAL_FAKE, 
"localhost", null, null, null);
-_client = client;
-_isLocal = true;
+this.client = client;
+isLocal = true;
 }
 
 /**
+ * Is the local override set or not.
  * @return true of new clients will be overridden to connect to a 
local cluster and not the configured remote cluster.
  */
 public static boolean isLocalOverride() {
 return _l

[GitHub] storm issue #2808: [STORM-3131] Support hostname-substitution for blobstore....

2018-09-06 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2808
  
Could you please re-check? Thanks! @HeartSaVioR @revans2 


---


[GitHub] storm pull request #2819: [STORM-3213] fix server error on system component ...

2018-09-05 Thread Ethanlm
GitHub user Ethanlm opened a pull request:

https://github.com/apache/storm/pull/2819

[STORM-3213] fix server error on system component page on storm UI

Fix

```
2018-09-05 16:15:24.927 o.a.s.t.ProcessFunction pool-21-thread-55 [ERROR] 
Internal error processing getComponentPageInfo
java.lang.RuntimeException: java.lang.NullPointerException
at 
org.apache.storm.daemon.nimbus.Nimbus.getComponentPageInfo(Nimbus.java:4238) 
~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.generated.Nimbus$Processor$getComponentPageInfo.getResult(Nimbus.java:4577)
 ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.generated.Nimbus$Processor$getComponentPageInfo.getResult(Nimbus.java:4556)
 ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.thrift.ProcessFunction.process(ProcessFunction.java:38) 
[shaded-deps-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.thrift.TBaseProcessor.process(TBaseProcessor.java:39) 
[shaded-deps-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.security.auth.SimpleTransportPlugin$SimpleWrapProcessor.process(SimpleTransportPlugin.java:169)
 [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:518)
 [shaded-deps-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.thrift.server.Invocation.run(Invocation.java:18) 
[shaded-deps-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[?:1.8.0_131]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[?:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
Caused by: java.lang.NullPointerException
at 
org.apache.storm.scheduler.resource.ResourceUtils.getBoltResources(ResourceUtils.java:37)
 ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
at 
org.apache.storm.daemon.nimbus.Nimbus.getComponentPageInfo(Nimbus.java:4192) 
~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
... 10 more
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ethanlm/storm STORM-3213

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2819.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2819


commit 26a6c878ce368ddba026a2977f484523a70f3d56
Author: Ethan Li 
Date:   2018-09-05T19:31:07Z

[STORM-3213] fix server error on system component page on storm UI




---


[GitHub] storm issue #2808: [STORM-3131] Support hostname-substitution for blobstore....

2018-09-05 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2808
  
The failed tests should be unrelated. 


---


[GitHub] storm issue #2808: [STORM-3131] Support hostname-substitution for blobstore....

2018-09-04 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2808
  
Sorry forgot this PR. I will do it soon. Thanks @revans2 


---


[GitHub] storm pull request #2808: [STORM-3131] Support hostname-substitution for blo...

2018-08-17 Thread Ethanlm
GitHub user Ethanlm opened a pull request:

https://github.com/apache/storm/pull/2808

[STORM-3131] Support hostname-substitution for blobstore.hdfs.principal

https://issues.apache.org/jira/browse/STORM-3131

 With this feature, we can set 
```
blobstore.hdfs.principal: /HOSTNAME@domain
```
on nodes(nimbi, supervisors) the `HOSTNAME` will be replaced with the 
actual hostname on that host.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Ethanlm/storm STORM-3131

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2808.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2808


commit ed815638d22d976e710295e977642e3e271eff8b
Author: Ethan Li 
Date:   2018-08-17T13:26:29Z

[STORM-3131] Support hostname-substitution for blobstore.hdfs.principal




---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209348428
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4745,194 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary = new 
CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) {
+@Override
+protected ClusterSummary loadValue() {
+try {
+if (active) {
+ClusterSummary newSummary = 
getClusterInfoImpl();
+LOG.info("the new summary is {}", newSummary);
--- End diff --

This better to be `LOG.debug`?


---


[GitHub] storm issue #2789: STORM-3173: flush metrics to ScheduledReporter on shutdow...

2018-08-10 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2789
  
sure. Take your time please.


---


[GitHub] storm issue #2789: STORM-3173: flush metrics to ScheduledReporter on shutdow...

2018-08-10 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2789
  
That's right. Will take a look at non-static registry after #2764 and #2764


---


[GitHub] storm issue #2789: STORM-3173: flush metrics to ScheduledReporter on shutdow...

2018-08-10 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2789
  
Tested it manually on a single-node cluster. `nimbus:num-shutdown-calls` 
was flushed now with this patch. Still need to figure out travis build failure 
before merging this in.


---


[GitHub] storm issue #2789: STORM-3173: flush metrics to ScheduledReporter on shutdow...

2018-08-10 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2789
  
I am also getting ` 
org.apache.maven.surefire.booter.SurefireBooterForkException: Error occurred in 
starting fork, check output in log` on my VM. And the tests passed without this 
patch. Still need to investigate the issue


---


[GitHub] storm pull request #2789: STORM-3173: flush metrics to ScheduledReporter on ...

2018-08-10 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2789#discussion_r209274375
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -96,4 +99,10 @@ public static void startMetricsReporters(Map topoConf) {
 throw e;
 }
 }
+
+@FunctionalInterface
+public interface Session extends AutoCloseable {
--- End diff --

Do we need this? We can have `startMetricsReporters` returns 
preparableReporters. It will be more clear.
But if you prefer to keep this, you might want to pick a better name 
because this is way to general


---


[GitHub] storm pull request #2789: STORM-3173: flush metrics to ScheduledReporter on ...

2018-08-10 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2789#discussion_r209269207
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
 ---
@@ -18,16 +18,12 @@
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.storm.daemon.metrics.ClientMetricsUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class ConsolePreparableReporter implements 
PreparableReporter {
-private static final Logger LOG = 
LoggerFactory.getLogger(ConsolePreparableReporter.class);
-ConsoleReporter reporter = null;
+public class ConsolePreparableReporter extends 
ScheduledPreparableReporter {
 
 @Override
 public void prepare(MetricRegistry metricsRegistry, Map topoConf) {
-LOG.debug("Preparing...");
+log.debug("Preparing...");
--- End diff --

 `LOG` is used everywhere. I think it's better to use `LOG` here too


---


[GitHub] storm pull request #2789: STORM-3173: flush metrics to ScheduledReporter on ...

2018-08-10 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2789#discussion_r209274273
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java ---
@@ -49,7 +49,7 @@ public Pacemaker(Map conf) {
 heartbeats = new ConcurrentHashMap<>();
 this.conf = conf;
 StormMetricsRegistry.registerGauge("pacemaker:size-total-keys", 
heartbeats::size);
-StormMetricsRegistry.startMetricsReporters(conf);
+
Utils.addShutdownHookWithForceKillIn1Sec(StormMetricsRegistry.startMetricsReporters(conf)::close);
--- End diff --

better to break this into two lines


---


[GitHub] storm pull request #2789: STORM-3173: flush metrics to ScheduledReporter on ...

2018-08-10 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2789#discussion_r209301643
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java 
---
@@ -316,7 +317,7 @@ public void launchDaemon() {
 //This will only get updated once
 
StormMetricsRegistry.registerMeter("supervisor:num-launched").mark();
 
StormMetricsRegistry.registerMeter("supervisor:num-shell-exceptions", 
ShellUtils.numShellExceptions);
-StormMetricsRegistry.startMetricsReporters(conf);
+metricsReporters = 
StormMetricsRegistry.startMetricsReporters(conf);
--- End diff --

since you are not really returning metricsReporters here, better not to use 
this name `metricsReporters`


---


[GitHub] storm pull request #2789: STORM-3173: flush metrics to ScheduledReporter on ...

2018-08-10 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2789#discussion_r209271682
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java
 ---
@@ -22,9 +22,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class JmxPreparableReporter implements 
PreparableReporter {
+public class JmxPreparableReporter implements PreparableReporter {
--- End diff --

why not extending `ScheduledReporter`


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r209070836
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2891,33 +2884,26 @@ public void launchServer() throws Exception {
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
 
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+//We want to update longest scheduling time in real time 
in case scheduler get stuck
+// It's normal to see some very minor jiggling in value as 
race condition may happen
 Long currTime = Time.nanoTime();
-Long startTime = schedulingStartTime.get();
-//There could be race condition here but seems trivial, 
elapsed is
-// guaranteed to be no longer than real elapsed time of 
scheduling
-Long longest = longestSchedulingTime.get();
-if (startTime != null) {
-longest = currTime - startTime > longest ? currTime - 
startTime : longest;
-}
-//To millis. How should I put the constant for magic 
numbers?
-return longest * 1e-6;
+Long startTime = schedulingStartTimeNs.get();
+return TimeUnit.NANOSECONDS.toMillis(startTime == null ?
+longestSchedulingTime.get() : Math.max(currTime - 
startTime, longestSchedulingTime.get()));
 });
 
StormMetricsRegistry.registerMeter("nimbus:num-launched").mark();
 StormMetricsRegistry.startMetricsReporters(conf);
 
-//IntelliJ suggests clusterConsumerExceutors always non null 
(unnecessary if statement)
-if (clusterConsumerExceutors != null) {
-timer.scheduleRecurring(0, 
ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
-() -> {
-try {
-if (isLeader()) {
-
sendClusterMetricsToExecutors();
-}
-} catch (Exception e) {
-throw new 
RuntimeException(e);
+timer.scheduleRecurring(0, 
ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
+() -> {
+try {
+if (isLeader()) {
+
sendClusterMetricsToExecutors();
--- End diff --

sorry a mistake 


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r209066631
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -267,8 +267,8 @@
 private static final Histogram numAddedSlotPerScheduling = 
StormMetricsRegistry.registerHistogram("nimbus:num-added-slots-per-scheduling", 
new ExponentiallyDecayingReservoir());
 private static final Histogram numRemovedExecPerScheduling = 
StormMetricsRegistry.registerHistogram("nimbus:num-removed-executors-per-scheduling",
 new ExponentiallyDecayingReservoir());
 private static final Histogram numRemovedSlotPerScheduling = 
StormMetricsRegistry.registerHistogram("nimbus:num-removed-slots-per-scheduling",
 new ExponentiallyDecayingReservoir());
-private static final Histogram numNetExecChangePerScheduling = 
StormMetricsRegistry.registerHistogram("nimbus:num-net-executors-changed-per-scheduling",
 new ExponentiallyDecayingReservoir());
-private static final Histogram numNetSlotChangePerScheduling = 
StormMetricsRegistry.registerHistogram("nimbus:num-net-slots-changed-per-scheduling",
 new ExponentiallyDecayingReservoir());
+private static final Histogram numNetExecIncreasePerScheduling = 
StormMetricsRegistry.registerHistogram("nimbus:num-net-executors-changed-per-scheduling",
 new ExponentiallyDecayingReservoir());
+private static final Histogram numNetSlotIncreasePerScheduling = 
StormMetricsRegistry.registerHistogram("nimbus:num-net-slots-changed-per-scheduling",
 new ExponentiallyDecayingReservoir());
--- End diff --

better to change `nimbus:num-net-slots-changed-per-scheduling` too


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r209067011
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2891,33 +2884,26 @@ public void launchServer() throws Exception {
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
 
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+//We want to update longest scheduling time in real time 
in case scheduler get stuck
+// It's normal to see some very minor jiggling in value as 
race condition may happen
 Long currTime = Time.nanoTime();
-Long startTime = schedulingStartTime.get();
-//There could be race condition here but seems trivial, 
elapsed is
-// guaranteed to be no longer than real elapsed time of 
scheduling
-Long longest = longestSchedulingTime.get();
-if (startTime != null) {
-longest = currTime - startTime > longest ? currTime - 
startTime : longest;
-}
-//To millis. How should I put the constant for magic 
numbers?
-return longest * 1e-6;
+Long startTime = schedulingStartTimeNs.get();
+return TimeUnit.NANOSECONDS.toMillis(startTime == null ?
+longestSchedulingTime.get() : Math.max(currTime - 
startTime, longestSchedulingTime.get()));
 });
 
StormMetricsRegistry.registerMeter("nimbus:num-launched").mark();
 StormMetricsRegistry.startMetricsReporters(conf);
 
-//IntelliJ suggests clusterConsumerExceutors always non null 
(unnecessary if statement)
-if (clusterConsumerExceutors != null) {
-timer.scheduleRecurring(0, 
ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
-() -> {
-try {
-if (isLeader()) {
-
sendClusterMetricsToExecutors();
-}
-} catch (Exception e) {
-throw new 
RuntimeException(e);
+timer.scheduleRecurring(0, 
ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)),
+() -> {
+try {
+if (isLeader()) {
+
sendClusterMetricsToExecutors();
--- End diff --

looks like a typo here?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209016305
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-CPU");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-CPU");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-CPU");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-CPU");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary = new 
CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) {
+@Override
+protected ClusterSummary loadValue() {
+try {
+if (active) {
+ClusterSummary newSummary = 
getClusterInfoImpl();
+LOG.info("the new summary is 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208965124
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
--- End diff --

you can file a separate jira to discuss about it and remove it from this pr


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208987159
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-CPU");
--- End diff --

use lowercase for "CPU" to be consistent with other names?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208984273
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2871,16 +2897,15 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+//Be cautious using method reference instead of lambda. 
subexpression preceding :: will be evaluated only upon evaluation
+// Num supervisor, and fragmented resources have been included 
in cluster summary
+
StormMetricsRegistry.registerGauge("nimbus:total-available-memory 
(nonegative)", () -> nodeIdToResources.get().values()
--- End diff --

why do we need `non-negative`


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209007420
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
--- End diff --

this is not used


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208986783
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
--- End diff --

`ported` is not clear out of context of this PR. better to use something 
like `clusterSummaryMetrics`


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208965019
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2918,6 +2943,10 @@ public void launchServer() throws Exception {
 }
 });
 }
+
+//Should we make the delaySecs and recurSecs in sync with any 
conf value?
+// They should be around the reporting interval, but it's not 
configurable
+timer.scheduleRecurring(5, 5, clusterMetricSet);
--- End diff --

why do we set to 5 and 5


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208988762
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-CPU");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-CPU");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-CPU");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-CPU");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
--- End diff --

assertion is good. Just curious in which case this will happen


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208991078
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-CPU");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-CPU");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-CPU");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-CPU");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary = new 
CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) {
+@Override
+protected ClusterSummary loadValue() {
+try {
+if (active) {
+ClusterSummary newSummary = 
getClusterInfoImpl();
+LOG.info("the new summary is 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208986545
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
--- End diff --

Don't need constant since it's only an initial capacity of map


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r209005776
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
 ---
@@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, 
String value) throws InvalidR
 }
 }
 
+/**
+ * Find the first N matches of target string in files.
+ * @param logs all candidate log files to search
+ * @param numMatches number of matches expected
+ * @param fileOffset   
   Unclear metrics
--- End diff --

we can discuss about it in a separate jira


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208723928
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2219,21 +2288,16 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 newAssignments.put(topoId, newAssignment);
 }
 
-if (!newAssignments.equals(existingAssignments)) {
+boolean assignmentChanged = 
inspectSchduling(existingAssignments, newAssignments);
+if (assignmentChanged) {
 LOG.debug("RESETTING id->resources and 
id->worker-resources cache!");
-LOG.info("Fragmentation after scheduling is: {} MB, {} 
PCore CPUs", fragmentedMemory(), fragmentedCpu());
-nodeIdToResources.get().forEach((id, node) ->
-LOG.info(
-"Node Id: {} Total 
Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
-+ "CPU: {}, 
Available CPU: {}, fragmented: {}",
-id, 
node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(),
-
node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), 
isFragmented(node)));
 idToResources.set(new HashMap<>());
 idToWorkerResources.set(new HashMap<>());
 }
 
 //tasks figure out what tasks to talk to by looking at 
topology at runtime
 // only log/set when there's been a change to the assignment
+// TODO: why do we have loop fission here
--- End diff --

remove comment


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208707067
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java
 ---
@@ -134,6 +150,7 @@ public Response daemonLog(@Context HttpServletRequest 
request) throws IOExceptio
  */
 @GET
 @Path("/searchLogs")
+//Seems redundant
--- End diff --

remove


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208723798
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2057,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
--- End diff --

remove comment


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208691016
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
 ---
@@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, 
String value) throws InvalidR
 }
 }
 
+/**
+ * Find the first N matches of target string in files.
+ * @param logs all candidate log files to search
+ * @param numMatches number of matches expected
+ * @param fileOffset   
   Unclear metrics
--- End diff --

comment seems off


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208726809
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
 ---
@@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, 
String value) throws InvalidR
 }
 }
 
+/**
+ * Find the first N matches of target string in files.
+ * @param logs all candidate log files to search
+ * @param numMatches number of matches expected
+ * @param fileOffset   
   Unclear metrics
+ * @param startByteOffset number of byte to be ignored in each log file
+ * @param targetStr searched string
+ * @return all matched results
+ */
 @VisibleForTesting
-Matched findNMatches(List logs, int numMatches, int fileOffset, 
int offset, String search) {
+Matched findNMatches(List logs, int numMatches, int fileOffset, 
int startByteOffset, String targetStr) {
 logs = drop(logs, fileOffset);
+LOG.debug("{} files to scan", logs.size());
 
 List> matches = new ArrayList<>();
 int matchCount = 0;
+int scannedFiles = 0;
 
+//TODO: Unnecessarily convoluted loop that should be optimized
 while (true) {
 if (logs.isEmpty()) {
 break;
 }
 
 File firstLog = logs.get(0);
-Map theseMatches;
+Map matchInLog;
 try {
 LOG.debug("Looking through {}", firstLog);
-theseMatches = substringSearch(firstLog, search, 
numMatches - matchCount, offset);
+matchInLog = substringSearch(firstLog, targetStr, 
numMatches - matchCount, startByteOffset);
+scannedFiles++;
 } catch (InvalidRequestException e) {
 LOG.error("Can't search past end of file.", e);
-theseMatches = new HashMap<>();
+matchInLog = new HashMap<>();
 }
 
 String fileName = 
WorkerLogs.getTopologyPortWorkerLog(firstLog);
 
+//This section simply put the formatted log filename and 
corresponding port in the matching.
 final List> newMatches = new 
ArrayList<>(matches);
-Map currentFileMatch = new 
HashMap<>(theseMatches);
+Map currentFileMatch = new 
HashMap<>(matchInLog);
 currentFileMatch.put("fileName", fileName);
 Path firstLogAbsPath;
 try {
 firstLogAbsPath = firstLog.getCanonicalFile().toPath();
 } catch (IOException e) {
 throw new RuntimeException(e);
 }
+//Why do we need to start from scratch to retrieve just the 
port here?
 currentFileMatch.put("port", 
truncatePathToLastElements(firstLogAbsPath, 2).getName(0).toString());
 newMatches.add(currentFileMatch);
 
-int newCount = matchCount + 
((List)theseMatches.get("matches")).size();
+int newCount = matchCount + 
((List)matchInLog.get("matches")).size();
 
-//theseMatches is never empty! As guaranteed by the 
#get().size() method above
+//matchInLog is never empty! As guaranteed by the 
#get().size() method above
 if (newCount == matchCount) {
 // matches and matchCount is not changed
 logs = rest(logs);
-offset = 0;
+startByteOffset = 0;
 fileOffset = fileOffset + 1;
 } else if (newCount >= numMatches) {
 matches = newMatches;
 break;
 } else {
 matches = newMatches;
 logs = rest(logs);
-offset = 0;
+startByteOffset = 0;
 fileOffset = fileOffset + 1;
 matchCount = newCount;
 }
 }
 
-return new Matched(fileOffset, search, matches);
+LOG.info("scanned {} files", scannedFiles);
+//fileOffset is not being used and it behaves inconsistently 
(showing
+// (index of files search ends on - 1) if [enough matches] else 
(index of files search ends on))
+// I don't think we should expose the data to public if it's not 
used.
+// can I dropped this field or change its behavior so it's used 
for metrics [numScannedFiles]?
--- End diff --

You can file a separate jira for this


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208705884
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java
 ---
@@ -55,6 +63,8 @@ public Response downloadFile(String fileName, String 
user, boolean isDaemon) thr
 File file = new File(rootDir, fileName).getCanonicalFile();
 if (file.exists()) {
 if (isDaemon || 
resourceAuthorizer.isUserAllowedToAccessFile(user, fileName)) {
+//How should I put the constant for magic numbers?
--- End diff --

It's fine here.  


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208704852
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
 ---
@@ -223,8 +246,8 @@ void cleanupEmptyTopoDirectory(File dir) throws 
IOException {
 
 @VisibleForTesting
 FileFilter mkFileFilterForLogCleanup(long nowMillis) {
-final long cutoffAgeMillis = cleanupCutoffAgeMillis(nowMillis);
-return file -> !file.isFile() && 
lastModifiedTimeWorkerLogdir(file) <= cutoffAgeMillis;
+//Doesn't it make more sense to do file.isDirectory here?
--- End diff --

I agree. Also the name of this method is not clear


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208706459
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
 ---
@@ -88,9 +87,14 @@ public void setLogFilePermission(String fileName) throws 
IOException {
 
 if (runAsUser && topoOwner.isPresent() && file.exists() && 
!Files.isReadable(file.toPath())) {
 LOG.debug("Setting permissions on file {} with topo-owner {}", 
fileName, topoOwner);
-ClientSupervisorUtils.processLauncherAndWait(stormConf, 
topoOwner.get(),
-Lists.newArrayList("blob", file.getCanonicalPath()), 
null,
-"setup group read permissions for file: " + fileName);
+try {
+ClientSupervisorUtils.processLauncherAndWait(stormConf, 
topoOwner.get(),
+Lists.newArrayList("blob", 
file.getCanonicalPath()), null,
+"setup group read permissions for file: " + 
fileName);
+} catch (IOException e) {
+ExceptionMeters.NUM_PERMISSION_EXCEPTIONS.mark();
--- End diff --

The exception name is not clear


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208703040
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
 ---
@@ -186,7 +192,22 @@ private boolean isFileEligibleToSkipDelete(boolean 
forPerDir, Set active
 break;
 }
 }
+} catch (IOException e) {
+ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark();
--- End diff --

better to use `NUM_FILE_SCANNED_EXCEPTIONS`?


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208719630
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +748,99 @@ private static int numUsedWorkers(SchedulerAssignment 
assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.si

[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208689498
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
 ---
@@ -71,6 +78,11 @@
 
 public class LogviewerLogSearchHandler {
 private static final Logger LOG = 
LoggerFactory.getLogger(LogviewerLogSearchHandler.class);
+private static final Meter numDeepSearchNoResult = 
StormMetricsRegistry.registerMeter("logviewer:num-deep-search-no-result");
+private static final Histogram numFilesOpenedDeepSearch = 
StormMetricsRegistry.registerHistogram(
+"logviewer:num-files-opened-deep-search", new 
ExponentiallyDecayingReservoir());
--- End diff --

opened--> scanned? more clear


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208691270
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
 ---
@@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, 
String value) throws InvalidR
 }
 }
 
+/**
+ * Find the first N matches of target string in files.
+ * @param logs all candidate log files to search
+ * @param numMatches number of matches expected
+ * @param fileOffset   
   Unclear metrics
+ * @param startByteOffset number of byte to be ignored in each log file
+ * @param targetStr searched string
+ * @return all matched results
+ */
 @VisibleForTesting
-Matched findNMatches(List logs, int numMatches, int fileOffset, 
int offset, String search) {
+Matched findNMatches(List logs, int numMatches, int fileOffset, 
int startByteOffset, String targetStr) {
 logs = drop(logs, fileOffset);
+LOG.debug("{} files to scan", logs.size());
 
 List> matches = new ArrayList<>();
 int matchCount = 0;
+int scannedFiles = 0;
 
+//TODO: Unnecessarily convoluted loop that should be optimized
--- End diff --

remove comment


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208718349
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +748,99 @@ private static int numUsedWorkers(SchedulerAssignment 
assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.si

[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208703547
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
 ---
@@ -95,6 +102,9 @@ public LogCleaner(Map stormConf, 
WorkerLogs workerLogs, Director
 
 LOG.info("configured max total size of worker logs: {} MB, max 
total size of worker logs per directory: {} MB",
 maxSumWorkerLogsSizeMb, maxPerWorkerLogsSizeMb);
+//Switch to CachedGauge if this starts to hurt performance
+// 
https://stackoverflow.com/questions/5857199/how-to-find-out-the-size-of-file-and-directory-in-java-without-creating-the-obje
--- End diff --

remove link


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208651482
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
 ---
@@ -265,32 +269,26 @@ public Response daemonLogPage(String fileName, 
Integer start, Integer length, St
 String rootDir = daemonLogRoot;
 File file = new File(rootDir, fileName).getCanonicalFile();
 String path = file.getCanonicalPath();
-boolean isZipFile = path.endsWith(".gz");
 
 if (file.exists() && new 
File(rootDir).getCanonicalFile().equals(file.getParentFile())) {
 // all types of files included
 List logFiles = Arrays.stream(new 
File(rootDir).listFiles())
 .filter(File::isFile)
 .collect(toList());
 
-List filesStrWithoutFileParam = logFiles.stream()
-.map(File::getName).filter(fName -> 
!StringUtils.equals(fileName, fName)).collect(toList());
-
-List reorderedFilesStr = new ArrayList<>();
-reorderedFilesStr.addAll(filesStrWithoutFileParam);
+List reorderedFilesStr = logFiles.stream()
+.map(File::getName).filter(fName -> 
!StringUtils.equals(fileName, fName)).collect(toList());
 reorderedFilesStr.add(fileName);
 
 length = length != null ? Math.min(10485760, length) : 
LogviewerConstant.DEFAULT_BYTES_PER_PAGE;
-
-String logString;
-if (isTxtFile(fileName)) {
-logString = escapeHtml(start != null ? pageFile(path, 
start, length) : pageFile(path, length));
-} else {
-logString = escapeHtml("This is a binary file and cannot 
display! You may download the full file.");
+final boolean isZipFile = path.endsWith(".gz");
+long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : 
file.length();
--- End diff --

why not reuse `getFileLength`


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208377660
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -48,6 +53,30 @@ public static Meter registerMeter(String name) {
 return REGISTRY.register(name, new Meter());
 }
 
+//Change the name to avoid name conflict in future Metrics release
--- End diff --

change the name?


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208394271
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
 ---
@@ -193,24 +198,23 @@ public Response logPage(String fileName, Integer 
start, Integer length, String g
 throw e.getCause();
 }
 
-List filesStrWithoutFileParam = 
logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog)
-.filter(fileStr -> !StringUtils.equals(fileName, 
fileStr)).collect(toList());
-
-List reorderedFilesStr = new ArrayList<>();
-reorderedFilesStr.addAll(filesStrWithoutFileParam);
+List reorderedFilesStr = 
logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog)
+.filter(fileStr -> !StringUtils.equals(fileName, 
fileStr)).collect(toList());
 reorderedFilesStr.add(fileName);
 
 length = length != null ? Math.min(10485760, length) : 
LogviewerConstant.DEFAULT_BYTES_PER_PAGE;
-
-String logString;
-if (isTxtFile(fileName)) {
-logString = escapeHtml(start != null ? pageFile(path, 
start, length) : pageFile(path, length));
-} else {
-logString = escapeHtml("This is a binary file and 
cannot display! You may download the full file.");
+//This is the same as what #pageFile(String path, Integer 
tail) does
+// boolean isZipFile = path.endsWith(".gz");
+// long fileLength = isZipFile ? 
ServerUtils.zipFileSize(new File(path)) : new File(path).length();
+// return pageFile(path, Long.valueOf(fileLength - 
tail).intValue(), tail);
--- End diff --

remove comments


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208702901
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
 ---
@@ -124,6 +128,7 @@ public int deleteOldestWhileTooLarge(List dirs,
 File file = pq.poll();
 stack.push(file);
 }
+LOG.debug("pq: {}, stack: {}", pq, stack);
--- End diff --

do we still need this?


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208710506
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2890,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
+}
+//To millis. How should I put the constant for magic 
numbers?
--- End diff --

fine here. Or you can use a constant. 


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208377360
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
 ---
@@ -51,6 +51,7 @@ public void run() {
 private SupervisorWorkerHeartbeats getAndResetWorkerHeartbeats() {
 Map localHeartbeats;
 try {
+//TODO: This call no longer throws exceptions, do we still 
want to wrap it in try catch block?
--- End diff --

remove comment


---


[GitHub] storm issue #2743: [STORM-3130]: Add Wrappers for Timer registration and tim...

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2743
  
merged in https://github.com/apache/storm/pull/2710


---


[GitHub] storm issue #2710: STORM-3099: Extend metrics on supervisor, workers and DRP...

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2710
  
@zd-project  Could you squash all the commits? Will merge this in.


---


[GitHub] storm issue #2710: STORM-3099: Extend metrics on supervisor, workers and DRP...

2018-08-07 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2710
  
@HeartSaVioR @danny0405 Do you want to take a look again? Otherwise I think 
it's good to merge it.


---


[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...

2018-08-07 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2710#discussion_r208340502
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
@@ -365,6 +384,7 @@ void addReferencesToBlobs(PortAndAssignment pna, 
BlobChangingCallback cb)
 if (!localResourceList.isEmpty()) {
 getBlobs(localResourceList, pna, cb);
 }
+pna.complete();
--- End diff --

just one more nit: put this outside of this method since the method 
`addReferencesToBlobs` doesn't indicate `complete`


---


[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...

2018-08-07 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2710#discussion_r208279317
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java
 ---
@@ -28,11 +29,14 @@
 import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ShellUtils;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ClientSupervisorUtils {
+public static final Meter numWorkerLaunchExceptions = 
ShellUtils.numShellExceptions;
--- End diff --

I would like to see some comments so people can understand why 
`ClientSupervisorUtils` is using the same meter with `ShellUtils`


---


[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...

2018-08-07 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2710#discussion_r208092337
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---
@@ -346,6 +358,10 @@ public void cleanUp() throws IOException {
 _usedMemory.remove(_port);
 _reservedMemory.remove(_port);
 cleanUpForRestart();
+} catch (IOException e) {
+//This may or may not be reported depending on when process 
exits
--- End diff --

I would like to better understand this. In which case the meter will not be 
reported?


---


[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...

2018-08-07 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2710#discussion_r208095554
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
@@ -1008,6 +1015,13 @@ public void close() throws Exception {
 }
 
 static class DynamicState {
+private static final Map 
workerStateTransition = EnumUtil.toEnumMap(MachineState.class,
+machineState -> 
StormMetricsRegistry.registerMeter("supervisor:num-transitions-into-" + 
machineState.toString()));
+private static final Map workerStateDuration 
= EnumUtil.toEnumMap(MachineState.class,
+machineState -> StormMetricsRegistry.registerTimer(
+"supervisor:num-transitions-out-" + 
machineState.toString() + "-and-duration-ms")
--- End diff --

It would be good if we can find a better name


---


[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...

2018-08-07 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2710#discussion_r208279468
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
@@ -251,11 +261,18 @@ private LocalizedResource getUserFile(String user, 
String key) {
 long localVersion = blob.getLocalVersion();
 long remoteVersion = 
blob.getRemoteVersion(blobStore);
 if (localVersion != remoteVersion || 
!blob.isFullyDownloaded()) {
+if (!blob.isFullyDownloaded()) {
--- End diff --

Should be `if (!blob.isFullyDownloaded()) {`


---


[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...

2018-08-07 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2710#discussion_r208096352
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
@@ -1147,8 +1162,19 @@ public DynamicState 
withPendingLocalization(Future pendingDownload) {
  */
 public DynamicState withState(final MachineState state) {
 long newStartTime = Time.currentTimeMillis();
+//We may (though unlikely) lose metering here if state 
transition is too frequent (less than a millisecond)
+workerStateDuration.get(this.state).update(newStartTime - 
startTime, TimeUnit.MILLISECONDS);
+workerStateTransition.get(state).mark();
+
+LocalAssignment assignment = this.currentAssignment;
+if (this.state != MachineState.RUNNING && state == 
MachineState.RUNNING
+&& this.currentAssignment instanceof 
TimerDecoratedAssignment) {
+((TimerDecoratedAssignment) assignment).stopTiming();
+assignment = new LocalAssignment(this.currentAssignment);
--- End diff --

A few comments would be helpful.


---


[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...

2018-08-03 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2771#discussion_r207665264
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -50,19 +61,19 @@ public static void startMetricsReporters(Map topoConf) {
 }
 }
 
-private static  T register(final String name, T 
metric) {
-T ret;
+@Override
+//This is more similar to super#getOrAdd than super#register
+public  T register(final String name, T metric) 
throws IllegalArgumentException {
--- End diff --

It is fine. But I would like to see more javadoc for this method.


---


[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...

2018-08-03 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2771#discussion_r207594931
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -50,19 +61,19 @@ public static void startMetricsReporters(Map topoConf) {
 }
 }
 
-private static  T register(final String name, T 
metric) {
-T ret;
+@Override
+//This is more similar to super#getOrAdd than super#register
+public  T register(final String name, T metric) 
throws IllegalArgumentException {
--- End diff --

if the metric is MetricSet, it will be having the same issue with the old 
code.


---


[GitHub] storm pull request #2763: STORM-3150: Improve Gauge registration methods and...

2018-07-26 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2763#discussion_r205500685
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -27,52 +28,63 @@
 
 @SuppressWarnings("unchecked")
 public class StormMetricsRegistry {
-public static final MetricRegistry DEFAULT_REGISTRY = new 
MetricRegistry();
+private static final MetricRegistry DEFAULT_REGISTRY = new 
MetricRegistry();
 private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricsRegistry.class);
 
-public static Meter registerMeter(String name) {
-Meter meter = new Meter();
-return register(name, meter);
+public static Meter registerMeter(final String name) {
+return register(name, new Meter());
 }
 
-// TODO: should replace Callable to Gauge when nimbus.clj is 
translated to java
-public static Gauge registerGauge(final String name, final 
Callable fn) {
-Gauge gauge = new Gauge() {
-@Override
-public Integer getValue() {
-try {
-return (Integer) fn.call();
-} catch (Exception e) {
-LOG.error("Error getting gauge value for {}", name, e);
-}
-return 0;
+/**
+ * Register a gauge with provided callback.
+ * @param name name of the gauge
+ * @param fn callback that measures
+ * @param  type of measurement the callback returns, also the type 
of gauge
+ * @return registered gauge
+ */
+public static  Gauge registerGauge(final String name, final 
Callable fn) {
+return register(name, () -> {
+try {
+return fn.call();
+} catch (Exception e) {
+LOG.error("Error getting gauge value for {}", name, e);
 }
-};
-return register(name, gauge);
+return null;
+});
 }
 
-public static void registerProvidedGauge(final String name, Gauge 
gauge) {
+/**
+ * Register a provided gauge. Use this method if custom gauges is 
needed or
+ * no checked exceptions should be handled.
+ * @param name name of the gauge
+ * @param gauge gauge
+ * @param  type of value the gauge measures
+ */
+public static  void registerProvidedGauge(final String name, final 
Gauge gauge) {
 register(name, gauge);
 }
 
 public static Histogram registerHistogram(String name, Reservoir 
reservoir) {
-Histogram histogram = new Histogram(reservoir);
-return register(name, histogram);
+return register(name, new Histogram(reservoir));
+}
+
+public static void registerAll(final String prefix, MetricSet metrics) 
{
--- End diff --

I like the idea of having `registerMetricSet` and `unregisterMetricSet` in 
#2771 better. 

But to make the commit history easier to understand, I would suggest one of 
the following options:
1.  put the changes to the patch where is using these methods
2. Port the changes  in #2771 about `registerMetricSet` and 
`unregisterMetricSet` here. This is to avoid to change the same methods 
multiple times.

I prefer option 1.


---


[GitHub] storm issue #2714: STORM-3101: Fix unexpected metrics registration in StormM...

2018-06-14 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2714
  
Talked with @zd-project offline, I now have a better understand of the 
problem he is trying to solve as explained in 
https://issues.apache.org/jira/browse/STORM-3101. It's a really good catch. 
But we can discuss more about how to address it.


---


[GitHub] storm issue #2714: STORM-3101: Add filter to StormMetricsRegistry.

2018-06-14 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2714
  
Thanks for the contribution. It's good to exploit more on metrics 
functionality. 

I don't see the real need for adding filters since it's all storm internal 
code. But if you think it's better to have,  I would suggest to use a filter 
layer to make this more flexible. 

For example, 

Create a filter interface with a filter function. e.g.
```
public interface IStormMetricsRegistryFilter {
   public default boolean filter(String metricName) {
return false;
}
}
```
then you can have a function (e.g. addFilter) in StormMetricsRegistryFilter 
to add real implementation of the filter interface before StormMetricsRegistry 
starts to registerMeters().

The above is a very simple interface and might not be able to do much 
except filtering based on the `String` parameter. You can think about more on 
this.

I think the current implementation in this PR won't work because you are 
calling `setsource()` to late


---


[GitHub] storm issue #2718: STORM-3103 allow nimbus to shutdown properly

2018-06-14 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2718
  
The code change makes sense to me. Just wonder if the whole JVM is 
terminated immediately after exitProcess() is being called. 


---


[GitHub] storm issue #2700: [STORM-3093] Cache the storm id to executors mapping on m...

2018-06-04 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2700
  
@danny0405  No it's fine. Thanks for the contribution. 


---


[GitHub] storm issue #2700: [STORM-3093] Cache the storm id to executors mapping on m...

2018-06-04 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2700
  
Looks good. Do you have any performance test results to share so that we 
can have an idea how much performance gain this patch gets? It's fine if you 
don't have it


---


  1   2   3   4   5   >