[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240657555 --- Diff: storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java --- @@ -288,13 +288,15 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } -out.close(); -} catch (AuthorizationException | IOException | RuntimeException e) { --- End diff -- sorry I was wrong ---
[GitHub] storm pull request #2925: STORM-3302: Ensures we close streams to HDFS
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2925#discussion_r240642462 --- Diff: storm-client/src/jvm/org/apache/storm/blobstore/BlobStore.java --- @@ -288,13 +288,15 @@ public void createBlob(String key, InputStream in, SettableBlobMeta meta, Subjec while ((len = in.read(buffer)) > 0) { out.write(buffer, 0, len); } -out.close(); -} catch (AuthorizationException | IOException | RuntimeException e) { --- End diff -- It looks to me that the exception is still caught instead of thrown? ---
[GitHub] storm pull request #2902: [STORM-3282] Fix RAS worker count estimation
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2902#discussion_r231581587 --- Diff: storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java --- @@ -691,8 +691,10 @@ public static boolean isRAS(Map conf) { public static int getEstimatedWorkerCountForRASTopo(Map topoConf, StormTopology topology) throws InvalidTopologyException { -return (int) Math.ceil(getEstimatedTotalHeapMemoryRequiredByTopo(topoConf, topology) / - ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB))); +Double defaultWorkerMaxHeap = ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB)); +Double topologyWorkerMaxHeap = ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)); --- End diff -- Oh. ok. If there is a default value in yaml file, then that's fine. I think we don't want to have to maintain two default values both in default.yaml and also in the code. ---
[GitHub] storm pull request #2902: [STORM-3282] Fix RAS worker count estimation
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2902#discussion_r231542667 --- Diff: storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java --- @@ -691,8 +691,10 @@ public static boolean isRAS(Map conf) { public static int getEstimatedWorkerCountForRASTopo(Map topoConf, StormTopology topology) throws InvalidTopologyException { -return (int) Math.ceil(getEstimatedTotalHeapMemoryRequiredByTopo(topoConf, topology) / - ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB))); +Double defaultWorkerMaxHeap = ObjectReader.getDouble(topoConf.get(Config.WORKER_HEAP_MEMORY_MB)); +Double topologyWorkerMaxHeap = ObjectReader.getDouble(topoConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)); --- End diff -- Will `topoConf.get(Config.WORKER_HEAP_MEMORY_MB)` be null? If so, this will throw exception `IllegalArgumentException("Don't know how to convert null to double");` ---
[GitHub] storm pull request #2901: [STORM-3271] Docker support: launch storm workers ...
GitHub user Ethanlm opened a pull request: https://github.com/apache/storm/pull/2901 [STORM-3271] Docker support: launch storm workers in docker containers Spent a lot of effort on this. This patch let storm supervisor launches the worker in a docker container so that user code is isolated from each other and from the host machine. This is a security enhancement. Detailed explanation of this are available at docs/Docker-support.md in this pr. I tested it manually (only on `RHEL7`, didn't test it on other os systems) and tested with secure cluster setup. Also tested with our own integration test suites with docker support turned off and proved that this pr won't break storm. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Ethanlm/storm STORM-3271 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2901.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2901 commit ba8b7cf80347fe077e097206023adb3326c8b4ad Author: Ethan Li Date: 2018-10-12T23:24:15Z refactor supervisor source code commit 2387da9ae554ed81573e35545992754bc1454c9d Author: Ethan Li Date: 2018-10-12T23:24:23Z add docker support commit b130bcb649f6adcb1fef588dc7033361999a9d0f Author: Ethan Li Date: 2018-10-15T15:27:39Z add cgrouproot and nscd bind mounts add user's groups and simply getGroupIdInfo and getUserIdInfo functions commit be2efc833144004c126ff150f9676adc15575549 Author: Ethan Li Date: 2018-10-16T13:48:16Z address some review comments commit 80f3296688fce2bec9692a2699ae396a32c8e02e Author: Ethan Li Date: 2018-10-16T22:04:15Z fix worker-launcher for secure storm cluster commit d4e2127ef9664c0398f2e85c26d0dde5734d3444 Author: Ethan Li Date: 2018-10-17T14:35:19Z launch docker container in attached mode commit 8e4c3c7a3cfe2a38dc847ea2ebda2ce74f9a45ff Author: Ethan Li Date: 2018-10-17T16:05:18Z check prerequisites in prepare method; bind mount workerArtifact more precisely commit e0131136fd864aa5bc4a3b8e40b538d04a1c36ae Author: Ethan Li Date: 2018-10-19T19:57:24Z fix user could be null (e.g when supervisor restarts) commit 5dfcd9ff8869f6a4334ccecdefb0f5c1f40ea9fb Author: Ethan Li Date: 2018-10-22T20:24:20Z fix typo; make network type non-configurable commit fa5e2999312258b4da0b1936591804780bd8ec8e Author: Ethan Li Date: 2018-10-23T19:55:45Z make worker-launcher more secure commit 0f08bcde454f7a2e613eb673ba697236d8122999 Author: Ethan Li Date: 2018-10-24T16:28:24Z make nsenter more secure commit 721c6b85e38b87761f0a52c19cd9b684bf70cbba Author: Ethan Li Date: 2018-10-24T20:10:56Z add more comments; add list of readonly bindmounts commit be88fb7c174ab15e22ab99590530919499fe9d38 Author: Ethan Li Date: 2018-10-24T20:53:10Z making nsenter safer commit 1d84bf7d86cfa7e3c603ef4f99e0e6c4e2ec9ede Author: Ethan Li Date: 2018-10-25T14:23:18Z add a whitelist for docker images commit 9c8ae38bdbc31d897eca9f1779d96d5b995fcec6 Author: Ethan Li Date: 2018-10-25T21:20:39Z check every image in the whitelist; prevent memory leark from worker-launcher; fix format and trivial issues. commit 52350f4363821ac14a5b7b2114bc1bddb204299a Author: Ethan Li Date: 2018-10-26T20:18:25Z launch docker container in background; use docker-wait to get the exitCode if the contaner exits commit 6c80da77481cccabd047452b055ab3cd68a67c40 Author: Ethan Li Date: 2018-10-30T20:46:26Z add a cgroup sub path template to support various cgroup setup commit 8b9c18f314c1f8bf12140631f7de052bfc35cff5 Author: Ethan Li Date: 2018-10-31T20:19:13Z add unit tests for docker commands commit 05b31e875536f01b3fb61a378ce9de3c01d4a5d3 Author: Ethan Li Date: 2018-11-01T13:51:00Z use nsenter to do profiling; mount shared_by_topology/tmp to /tmp commit a1ffe50265f446b68dac0350c8cb4c49bf05992b Author: Ethan Li Date: 2018-11-01T15:02:12Z fix loging for storm.resource.isolation.plugin commit c4647183092541dc5957441500f62719510689f5 Author: Ethan Li Date: 2018-11-01T17:38:54Z fix issues when DockerManager is not used commit 190f06938a38f4225d94f86c2f1afbba068d1b22 Author: Ethan Li Date: 2018-11-05T16:54:32Z add documentation ---
[GitHub] storm issue #2899: [STORM-3265] flight.bash fall back to use java utils dire...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2899 Last commit prepended "[STORM-3265]" to the commit message ---
[GitHub] storm pull request #2900: [STORM-3264] fix "local variable referenced before...
GitHub user Ethanlm opened a pull request: https://github.com/apache/storm/pull/2900 [STORM-3264] fix "local variable referenced before assignment" in storm.py minor fix. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Ethanlm/storm patch-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2900.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2900 commit d0dc3b4bd86657f2be13c1c0cdae2e5933da9eb1 Author: Meng Li (Ethan) Date: 2018-11-01T14:08:15Z [STORM-3264] fix "local variable referenced before assignment" in storm.py ---
[GitHub] storm pull request #2899: flight.bash fall back to use java utils directly i...
GitHub user Ethanlm opened a pull request: https://github.com/apache/storm/pull/2899 flight.bash fall back to use java utils directly if $JAVA_HOME not set or java not avaialble in $BINPATH https://issues.apache.org/jira/browse/STORM-3265 If JAVA_HOME is not set, or `/usr/bin/java` not available, we should just use java directly You can merge this pull request into a Git repository by running: $ git pull https://github.com/Ethanlm/storm STORM-3265 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2899.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2899 commit 686ef4b4063381c57ce71f05296fcb1ffe0badf9 Author: Ethan Li Date: 2018-11-01T13:56:57Z flight.bash fall back to use java utils directly if $JAVA_HOME not set or java not avaialble in $BINPATH ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r229880617 --- Diff: storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -414,6 +418,34 @@ public void validateInteger(String name, Object o) { } } +public static class NumaEntryValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if (o == null) { +return; +} +Map numa = (Map) o; +for (String key : new String[]{NUMA_CORES, NUMA_MEMORY_IN_MB, NUMA_PORTS}) { +if (!numa.containsKey(key)) { +throw new IllegalArgumentException( +"The numa configuration key [" + key + "] is missing!" +); +} +} + +List cores = (List) numa.get(NUMA_CORES); +Set coreSet = new HashSet(); +coreSet.addAll(cores); +if (coreSet.size() != cores.size()) { --- End diff -- Thanks for adding this. Do we care about duplicate cores across the numa zones? (I don't for example. ``` numaid=0, cores=[0,1,2,3] numaid=1, cores=[0,1,2,3] ``` ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r229879326 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java --- @@ -607,6 +608,27 @@ protected String javaCmd(String cmd) { return ret; } +/** + * Extracting out to mock it for tests. + * @return true if on Linux. + */ +protected boolean isOnLinux() { +return SystemUtils.IS_OS_LINUX; +} + +private void prefixNumaPinningIfApplicable(String numaId, List commandList) { +if (numaId != null) { +if (isOnLinux()) { +commandList.add("numactl"); +commandList.add("--i=" + numaId); --- End diff -- sorry to keep beating you on this :D How about: ``` commandList.add(0, "numactl"); commandList.add(1, "--i=" + numaId); ``` since we want to add prefix to `commandList` no matter it's empty or not ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r229876743 --- Diff: storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java --- @@ -414,6 +418,34 @@ public void validateInteger(String name, Object o) { } } +public static class NumaEntryValidator extends Validator { + +@Override +public void validateField(String name, Object o) { +if (o == null) { +return; +} +Map numa = (Map) o; +for (String key : new String[]{NUMA_CORES, NUMA_MEMORY_IN_MB, NUMA_PORTS}) { +if (!numa.containsKey(key)) { +throw new IllegalArgumentException( +"The numa configuration key [" + key + "] is missing!" +); +} +} + +List cores = (List) numa.get(NUMA_CORES); +Set coreSet = new HashSet(); +coreSet.addAll(cores); +if (coreSet.size() != cores.size()) { +throw new IllegalArgumentException( +"No duplicate cores in NUMA config" --- End diff -- Better to be "duplicate cores in NUMA config"? ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r229878430 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java --- @@ -607,6 +608,27 @@ protected String javaCmd(String cmd) { return ret; } +/** + * Extracting out to mock it for tests. + * @return true if on Linux. + */ +protected boolean isOnLinux() { +return SystemUtils.IS_OS_LINUX; +} + +private void prefixNumaPinningIfApplicable(String numaId, List commandList) { +if (numaId != null) { +if (isOnLinux()) { +commandList.add("numactl"); +commandList.add("--i=" + numaId); +return; +} else { +// TODO : Add support for pinning on Windows host +throw new RuntimeException("numactl pinning currently not supported on non-Linux hosts"); --- End diff -- Maybe `UnsupportedOperationException` is a better option. But I am fine with this. ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r229875813 --- Diff: storm-client/src/jvm/org/apache/storm/Config.java --- @@ -1041,6 +1041,24 @@ @isPositiveNumber @NotNull public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs"; + +/** + * A map with keys mapped to each NUMA Node on the supervisor that will be used + * by scheduler. CPUs, memory and ports available on each NUMA node will be provided. + * Each supervisor will have different map of NUMAs. + * Example: "supervisor.numa.meta": { + * "0": { "memory.mb": 122880, "cores": [ 0, 12, 1, 13, 2, 14, 3, 15, 4, 16, 5, 17], + * "ports": [6700, 6701]}, + * "1" : {"memory.mb": 122880, "cores": [ 6, 18, 7, 19, 8, 20, 9, 21, 10, 22, 11, 23], --- End diff -- "ports" is numa.ports" now since ` "NUMA_PORTS = "numa.ports"` ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228300609 --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java --- @@ -121,11 +121,68 @@ private static String memoizedLocalHostnameString = null; public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS); +public static final String NUMA_MEMORY_IN_MB = "MemoryInMB"; +public static final String NUMA_CORES = "Cores"; +public static final String NUMAS_PORTS = "Ports"; +public static final String NUMA_ID = "Id"; +public static final String NUMAS_BASE = "Numas"; + static { localConf = readStormConfig(); serializationDelegate = getSerializationDelegate(localConf); } +/** + * Validate supervisor numa configuration. + * @param stormConf stormConf + * @return getValidatedNumaMap + * @throws KeyNotFoundException + */ +public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException { +Map validatedNumaMap = new HashMap(); +Map supervisorNumaMap = (Map) stormConf.get(Config.SUPERVISOR_NUMA_META); +if (supervisorNumaMap == null) return validatedNumaMap; +if (!supervisorNumaMap.containsKey(NUMAS_BASE)) { +throw new KeyNotFoundException("The numa configurations [" + NUMAS_BASE + "] is missing!"); +} +List numaEntries = (List) supervisorNumaMap.get(NUMAS_BASE); +if (numaEntries == null) return validatedNumaMap; +for (Map numa : numaEntries) { +for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) { +if (!numa.containsKey(key)) { +throw new KeyNotFoundException("The numa configuration key [" + key + "] is missing!"); +} +} +validatedNumaMap.put((String) numa.get(NUMA_ID), numa); +} +return validatedNumaMap; +} + +/** + * getNumaIdForPort. + * @param port port + * @param stormConf stormConf + * @return getNumaIdForPort + * @throws KeyNotFoundException + */ +public static String getNumaIdForPort(Number port, Map stormConf) { +Map validatedNumaMap = null; +try { +validatedNumaMap = getValidatedNumaMap(stormConf); +} catch (KeyNotFoundException e) { +LOG.error("Exception while getting NUMA config", e); +return null; --- End diff -- I agree that we should throw an exception if it's misconfigured. If numa is configured, I think we want to make sure it's working properly or fail fast if misconfigured. Storm admins might not notice there is an "ERROR" message at the first place and wondering why storm is not working as expected (after a relatively long time). If they don't want numa, they will not set numa configs. ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228303554 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java --- @@ -87,7 +87,7 @@ public Slot(AsyncLocalizer localizer, Map conf, AtomicReference> cachedCurrentAssignments, OnlyLatestExecutor metricsExec, WorkerMetricsProcessor metricsProcessor, -SlotMetrics slotMetrics) throws Exception { +SlotMetrics slotMetrics, String numaId) throws Exception { --- End diff -- Can we remove this? It looks like it's not used anywhere ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228307991 --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java --- @@ -119,15 +120,81 @@ // tests by subclassing. private static Utils _instance = new Utils(); private static String memoizedLocalHostnameString = null; -public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS); +public static final Pattern TOPOLOGY_KEY_PATTERN = +Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS); + +public static final String NUMA_MEMORY_IN_MB = "memory.mb"; +public static final String NUMA_CORES = "cores"; +public static final String NUMAS_PORTS = "ports"; +public static final String NUMA_ID = "node_id"; +public static final String NUMAS_BASE = "numas"; static { localConf = readStormConfig(); serializationDelegate = getSerializationDelegate(localConf); } /** - * Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that + * Validate supervisor numa configuration. + * @param stormConf stormConf + * @return getValidatedNumaMap + * @throws KeyNotFoundException + */ +public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException { +Map validatedNumaMap = new HashMap(); +Map supervisorNumaMap = +(Map) stormConf.get(Config.SUPERVISOR_NUMA_META); +if (supervisorNumaMap == null) { +return validatedNumaMap; +} +if (!supervisorNumaMap.containsKey(NUMAS_BASE)) { +throw new KeyNotFoundException( +"The numa configurations [" + NUMAS_BASE + "] is missing!" +); +} +List numaEntries = (List) supervisorNumaMap.get(NUMAS_BASE); +if (numaEntries == null) return validatedNumaMap; +for (Map numa : numaEntries) { +for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) { +if (!numa.containsKey(key)) { --- End diff -- Since we calculate cpu capacity based on the size of `NUMA_CORES`, do we need to validate that there is no dups in `NUMA_CORES` ? For example, what will happen if `NUMA_CORES` is `[0, 1, 1, 3]`? Another example: ``` {"node_id": 0, "memory.mb": 122880, "cores": [ 0, 1], "ports": [6700, 6701]}, {"node_id": 1, "memory.mb": 122880, "cores": [ 0, 1], "ports": [6702, 6703]} ``` ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228298411 --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java --- @@ -119,15 +120,81 @@ // tests by subclassing. private static Utils _instance = new Utils(); private static String memoizedLocalHostnameString = null; -public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS); +public static final Pattern TOPOLOGY_KEY_PATTERN = +Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS); + +public static final String NUMA_MEMORY_IN_MB = "memory.mb"; +public static final String NUMA_CORES = "cores"; +public static final String NUMAS_PORTS = "ports"; +public static final String NUMA_ID = "node_id"; +public static final String NUMAS_BASE = "numas"; static { localConf = readStormConfig(); serializationDelegate = getSerializationDelegate(localConf); } /** - * Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that + * Validate supervisor numa configuration. + * @param stormConf stormConf + * @return getValidatedNumaMap + * @throws KeyNotFoundException + */ +public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException { +Map validatedNumaMap = new HashMap(); +Map supervisorNumaMap = +(Map) stormConf.get(Config.SUPERVISOR_NUMA_META); +if (supervisorNumaMap == null) { +return validatedNumaMap; +} +if (!supervisorNumaMap.containsKey(NUMAS_BASE)) { +throw new KeyNotFoundException( --- End diff -- I think `KeyNotFoundException` is mostly about `blobstore`. And I don't think we should use it here. I think it's more like a `IllegalArgumentException` ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228321490 --- Diff: storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java --- @@ -319,7 +319,15 @@ public void testLaunch() throws Exception { superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal); superConf.put(DaemonConfig.STORM_LOG4J2_CONF_DIR, log4jdir); superConf.put(Config.WORKER_CHILDOPTS, " -Dtesting=true"); - +Map numaNode = new HashMap(); +numaNode.put(Utils.NUMA_ID, "0"); +numaNode.put(Utils.NUMA_CORES, Collections.singletonList("0")); +numaNode.put(Utils.NUMAS_PORTS, Collections.singletonList(8081)); +numaNode.put(Utils.NUMA_MEMORY_IN_MB, 2048); +Map numaMap = new HashMap(); +numaMap.put(Utils.NUMAS_BASE, Collections.singletonList(numaNode)); + +superConf.put(Config.SUPERVISOR_NUMA_META, numaMap); --- End diff -- Remove this since `testNumaPinnedLaunch()` is available ? ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228319880 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java --- @@ -81,6 +81,20 @@ public void add(NormalizedResourcesWithMemory other) { totalMemoryMb += other.getTotalMemoryMb(); } +/** + * Remove the resources in other from this. + * @param other the resources to be removed. + * @return true if one or more resources in other were larger than available resources in this, else false. + */ +public boolean remove(NormalizedResourcesWithMemory other) { --- End diff -- Maybe change `public boolean remove(NormalizedResourcesWithMemory other, ResourceMetrics resourceMetrics)` and then just call `remove(other, null)` ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228298658 --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java --- @@ -119,15 +120,81 @@ // tests by subclassing. private static Utils _instance = new Utils(); private static String memoizedLocalHostnameString = null; -public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS); +public static final Pattern TOPOLOGY_KEY_PATTERN = +Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS); + +public static final String NUMA_MEMORY_IN_MB = "memory.mb"; +public static final String NUMA_CORES = "cores"; +public static final String NUMAS_PORTS = "ports"; +public static final String NUMA_ID = "node_id"; +public static final String NUMAS_BASE = "numas"; static { localConf = readStormConfig(); serializationDelegate = getSerializationDelegate(localConf); } /** - * Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that + * Validate supervisor numa configuration. + * @param stormConf stormConf + * @return getValidatedNumaMap + * @throws KeyNotFoundException + */ +public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException { +Map validatedNumaMap = new HashMap(); +Map supervisorNumaMap = +(Map) stormConf.get(Config.SUPERVISOR_NUMA_META); +if (supervisorNumaMap == null) { +return validatedNumaMap; +} +if (!supervisorNumaMap.containsKey(NUMAS_BASE)) { +throw new KeyNotFoundException( +"The numa configurations [" + NUMAS_BASE + "] is missing!" +); +} +List numaEntries = (List) supervisorNumaMap.get(NUMAS_BASE); +if (numaEntries == null) return validatedNumaMap; +for (Map numa : numaEntries) { +for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) { +if (!numa.containsKey(key)) { +throw new KeyNotFoundException( --- End diff -- Same here. I think `IllegalArgumentException` is better ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228301282 --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java --- @@ -327,10 +396,12 @@ public static boolean isSystemId(String id) { } /** - * Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds equal to the return value of the previous + * Creates a thread that calls the given code repeatedly, sleeping for an interval of seconds + * equal to the return value of the previous * call. --- End diff -- nit: put "call" in the same line with other words? ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228321712 --- Diff: storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java --- @@ -390,6 +398,116 @@ public void testLaunch() throws Exception { "storm.log.dir", stormLogDir); } +@Test +public void testNumaPinnedLaunch() throws Exception { +final String topoId = "test_topology_current"; --- End diff -- it doesn't matter but it seems to better have `topoId = "test_topology_numa_pinned";` ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228266730 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java --- @@ -12,16 +12,24 @@ package org.apache.storm.daemon.supervisor.timer; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; + --- End diff -- ```suggestion ``` ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228264081 --- Diff: storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java --- @@ -77,6 +91,15 @@ public static BlobKeySequenceInfo normalizeNimbusHostPortSequenceNumberInfo(Stri } // Check for latest sequence number of a key inside zookeeper and return nimbodes containing the latest sequence number --- End diff -- ```suggestion ``` ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228266466 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java --- @@ -17,14 +17,23 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + --- End diff -- ```suggestion ``` ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228227560 --- Diff: storm-client/src/jvm/org/apache/storm/Config.java --- @@ -1041,6 +1041,19 @@ @isPositiveNumber @NotNull public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs"; + +/** + * A map with blobstore keys mapped to each NUMA Node on the supervisor that will be used --- End diff -- What's `blobstore keys` here ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228265639 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java --- @@ -607,6 +608,27 @@ protected String javaCmd(String cmd) { return ret; } +/** + * Extracting out to mock it for tests. + * @return true if on Linux. + */ +protected boolean isOnLinux() { +return SystemUtils.IS_OS_LINUX; +} + +private void addNumaPinningIfApplicable(String numaId, List commandList) { --- End diff -- It can be confusing since we are actually adding `numactl` as a prefix. This function looks like we are adding `numactl` to the end of the `commandList`. Although when we use it, `commandList` is empty. ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r228264226 --- Diff: storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java --- @@ -101,7 +124,11 @@ public static BlobKeySequenceInfo normalizeNimbusHostPortSequenceNumberInfo(Stri return nimbusInfoSet; } -// Get sequence number details from latest sequence number of the blob +/** + * Get sequence number details from latest sequence number of the blob --- End diff -- ```suggestion * Get sequence number details from latest sequence number of the blob. ``` ---
[GitHub] storm pull request #2744: [STORM-3132] Avoid NPE in the Values Constructor
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2744#discussion_r225330762 --- Diff: storm-client/src/jvm/org/apache/storm/tuple/Values.java --- @@ -23,9 +23,13 @@ public Values() { } public Values(Object... vals) { -super(vals.length); -for (Object o : vals) { -add(o); +super(vals != null ? vals.length : 0); --- End diff -- I think this can be `super(vals != null ? vals.length : 1);` since `if (vals==null)` we are going to add `null` to it. The length will be 1. ---
[GitHub] storm issue #2844: STORM-3232: Display on the UI all versions of storm that ...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2844 Why is `Topologies matching 1.0 will run under 2.0-unittests`? Should it be `run under 2.0.0-SNAPSHOT` since the nimbus version is `2.0.0-SNAPSHOT`? ---
[GitHub] storm pull request #2843: STORM-3230: Add in sync if key not found
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2843#discussion_r218905963 --- Diff: storm-client/src/jvm/org/apache/storm/security/auth/workertoken/WorkerTokenAuthorizer.java --- @@ -95,9 +95,12 @@ private static IStormClusterState buildStateIfNeeded(Map conf, T throw new IllegalArgumentException("Token is not valid, token has expired."); } -PrivateWorkerKey key = keyCache.getUnchecked(deser); -if (key == null) { -throw new IllegalArgumentException("Token is not valid, private key not found."); +PrivateWorkerKey key; +try { +key = keyCache.getUnchecked(deser); +} catch (CacheLoader.InvalidCacheLoadException e) { +//This happens when the cache has a null returned to it. --- End diff -- not sure about this. If `getUnchecked(deser);` returns null, it will throw an exception? ---
[GitHub] storm pull request #2823: [STORM-3206] validate topoConf only against Config...
GitHub user Ethanlm opened a pull request: https://github.com/apache/storm/pull/2823 [STORM-3206] validate topoConf only against Config.java during submission Storm should only validate client-side conf when user submits a topology. Otherwise settings like `logviewer.https.port=-1` will fail the submission. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Ethanlm/storm STORM-3206 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2823.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2823 commit e9af95e9b6a6ebc74fec833dc83c6bd18751313c Author: Ethan Li Date: 2018-09-11T15:29:30Z [STORM-3206] validate topoConf only against Config.java during submission ---
[GitHub] storm pull request #2820: STORM-3215: Add back in impersonation to UI
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2820#discussion_r215690566 --- Diff: storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java --- @@ -22,74 +22,128 @@ import org.apache.storm.security.auth.ReqContext; import org.apache.storm.security.auth.ThriftClient; import org.apache.storm.security.auth.ThriftConnectionType; -import org.apache.storm.shade.com.google.common.collect.Lists; -import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.storm.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Client used for connecting to nimbus. Typically you want to use a variant of the + * `getConfiguredClient` static method to get a client to use, as directly putting in + * a host and port does not support nimbus high availability. + */ public class NimbusClient extends ThriftClient { private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class); private static volatile Nimbus.Iface _localOverrideClient = null; private static String oldLeader = ""; /** * Indicates if this is a special client that is overwritten for local mode. */ -public final boolean _isLocal; -private Nimbus.Iface _client; +public final boolean isLocal; +private final Nimbus.Iface client; +/** + * Constructor, Please try to use `getConfiguredClient` instead of calling this directly. + * @param conf the conf for the client. + * @param host the host the client is to talk to. + * @param port the port the client is to talk to. + * @throws TTransportException on any error. + */ +@Deprecated public NimbusClient(Map conf, String host, int port) throws TTransportException { this(conf, host, port, null, null); } +/** + * Constructor, Please try to use `getConfiguredClient` instead of calling this directly. + * @param conf the conf for the client. + * @param host the host the client is to talk to. + * @param port the port the client is to talk to. + * @param timeout the timeout to use when connecting. + * @throws TTransportException on any error. + */ public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException { super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, null); -_client = new Nimbus.Client(_protocol); -_isLocal = false; +client = new Nimbus.Client(_protocol); +isLocal = false; } +/** + * Constructor, Please try to use `getConfiguredClientAs` instead of calling this directly. + * @param conf the conf for the client. + * @param host the host the client is to talk to. + * @param port the port the client is to talk to. + * @param timeout the timeout to use when connecting. + * @param asUser the name of the user you want to impersonate (use with caution as it is not always supported). + * @throws TTransportException on any error. + */ public NimbusClient(Map conf, String host, Integer port, Integer timeout, String asUser) throws TTransportException { super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, asUser); -_client = new Nimbus.Client(_protocol); -_isLocal = false; +client = new Nimbus.Client(_protocol); +isLocal = false; } +/** + * Constructor, Please try to use `getConfiguredClient` instead of calling this directly. + * @param conf the conf for the client. + * @param host the host the client is to talk to. + * @throws TTransportException on any error. + */ public NimbusClient(Map conf, String host) throws TTransportException { super(conf, ThriftConnectionType.NIMBUS, host, null, null, null); -_client = new Nimbus.Client(_protocol); -_isLocal = false; +client = new Nimbus.Client(_protocol); +isLocal = false; } private NimbusClient(Nimbus.Iface client) { super(new HashMap<>(), ThriftConnectionType.LOCAL_FAKE, "localhost", null, null, null); -_client = client; -_isLocal = true; +this.client = client; +isLocal = true; } /** + * Is the local override set or not. * @return true of new clients will be overridden to connect to a local cluster and not the configured remote cluster. */ public static boolean isLocalOverride() { return _l
[GitHub] storm pull request #2820: STORM-3215: Add back in impersonation to UI
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2820#discussion_r215690394 --- Diff: storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java --- @@ -22,74 +22,128 @@ import org.apache.storm.security.auth.ReqContext; import org.apache.storm.security.auth.ThriftClient; import org.apache.storm.security.auth.ThriftConnectionType; -import org.apache.storm.shade.com.google.common.collect.Lists; -import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.storm.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Client used for connecting to nimbus. Typically you want to use a variant of the + * `getConfiguredClient` static method to get a client to use, as directly putting in + * a host and port does not support nimbus high availability. + */ public class NimbusClient extends ThriftClient { private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class); private static volatile Nimbus.Iface _localOverrideClient = null; private static String oldLeader = ""; /** * Indicates if this is a special client that is overwritten for local mode. */ -public final boolean _isLocal; -private Nimbus.Iface _client; +public final boolean isLocal; +private final Nimbus.Iface client; +/** + * Constructor, Please try to use `getConfiguredClient` instead of calling this directly. + * @param conf the conf for the client. + * @param host the host the client is to talk to. + * @param port the port the client is to talk to. + * @throws TTransportException on any error. + */ +@Deprecated public NimbusClient(Map conf, String host, int port) throws TTransportException { this(conf, host, port, null, null); } +/** + * Constructor, Please try to use `getConfiguredClient` instead of calling this directly. + * @param conf the conf for the client. + * @param host the host the client is to talk to. + * @param port the port the client is to talk to. + * @param timeout the timeout to use when connecting. + * @throws TTransportException on any error. + */ public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException { super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, null); -_client = new Nimbus.Client(_protocol); -_isLocal = false; +client = new Nimbus.Client(_protocol); +isLocal = false; } +/** + * Constructor, Please try to use `getConfiguredClientAs` instead of calling this directly. + * @param conf the conf for the client. + * @param host the host the client is to talk to. + * @param port the port the client is to talk to. + * @param timeout the timeout to use when connecting. + * @param asUser the name of the user you want to impersonate (use with caution as it is not always supported). + * @throws TTransportException on any error. + */ public NimbusClient(Map conf, String host, Integer port, Integer timeout, String asUser) throws TTransportException { super(conf, ThriftConnectionType.NIMBUS, host, port, timeout, asUser); -_client = new Nimbus.Client(_protocol); -_isLocal = false; +client = new Nimbus.Client(_protocol); +isLocal = false; } +/** + * Constructor, Please try to use `getConfiguredClient` instead of calling this directly. + * @param conf the conf for the client. + * @param host the host the client is to talk to. + * @throws TTransportException on any error. + */ public NimbusClient(Map conf, String host) throws TTransportException { super(conf, ThriftConnectionType.NIMBUS, host, null, null, null); -_client = new Nimbus.Client(_protocol); -_isLocal = false; +client = new Nimbus.Client(_protocol); +isLocal = false; } private NimbusClient(Nimbus.Iface client) { super(new HashMap<>(), ThriftConnectionType.LOCAL_FAKE, "localhost", null, null, null); -_client = client; -_isLocal = true; +this.client = client; +isLocal = true; } /** + * Is the local override set or not. * @return true of new clients will be overridden to connect to a local cluster and not the configured remote cluster. */ public static boolean isLocalOverride() { return _l
[GitHub] storm issue #2808: [STORM-3131] Support hostname-substitution for blobstore....
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2808 Could you please re-check? Thanks! @HeartSaVioR @revans2 ---
[GitHub] storm pull request #2819: [STORM-3213] fix server error on system component ...
GitHub user Ethanlm opened a pull request: https://github.com/apache/storm/pull/2819 [STORM-3213] fix server error on system component page on storm UI Fix ``` 2018-09-05 16:15:24.927 o.a.s.t.ProcessFunction pool-21-thread-55 [ERROR] Internal error processing getComponentPageInfo java.lang.RuntimeException: java.lang.NullPointerException at org.apache.storm.daemon.nimbus.Nimbus.getComponentPageInfo(Nimbus.java:4238) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.generated.Nimbus$Processor$getComponentPageInfo.getResult(Nimbus.java:4577) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.generated.Nimbus$Processor$getComponentPageInfo.getResult(Nimbus.java:4556) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.thrift.ProcessFunction.process(ProcessFunction.java:38) [shaded-deps-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.thrift.TBaseProcessor.process(TBaseProcessor.java:39) [shaded-deps-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.security.auth.SimpleTransportPlugin$SimpleWrapProcessor.process(SimpleTransportPlugin.java:169) [storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:518) [shaded-deps-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.thrift.server.Invocation.run(Invocation.java:18) [shaded-deps-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] Caused by: java.lang.NullPointerException at org.apache.storm.scheduler.resource.ResourceUtils.getBoltResources(ResourceUtils.java:37) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.daemon.nimbus.Nimbus.getComponentPageInfo(Nimbus.java:4192) ~[storm-server-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] ... 10 more ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/Ethanlm/storm STORM-3213 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2819.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2819 commit 26a6c878ce368ddba026a2977f484523a70f3d56 Author: Ethan Li Date: 2018-09-05T19:31:07Z [STORM-3213] fix server error on system component page on storm UI ---
[GitHub] storm issue #2808: [STORM-3131] Support hostname-substitution for blobstore....
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2808 The failed tests should be unrelated. ---
[GitHub] storm issue #2808: [STORM-3131] Support hostname-substitution for blobstore....
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2808 Sorry forgot this PR. I will do it soon. Thanks @revans2 ---
[GitHub] storm pull request #2808: [STORM-3131] Support hostname-substitution for blo...
GitHub user Ethanlm opened a pull request: https://github.com/apache/storm/pull/2808 [STORM-3131] Support hostname-substitution for blobstore.hdfs.principal https://issues.apache.org/jira/browse/STORM-3131 With this feature, we can set ``` blobstore.hdfs.principal: /HOSTNAME@domain ``` on nodes(nimbi, supervisors) the `HOSTNAME` will be replaced with the actual hostname on that host. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Ethanlm/storm STORM-3131 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2808.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2808 commit ed815638d22d976e710295e977642e3e271eff8b Author: Ethan Li Date: 2018-08-17T13:26:29Z [STORM-3131] Support hostname-substitution for blobstore.hdfs.principal ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209348428 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4745,194 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary = new CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) { +@Override +protected ClusterSummary loadValue() { +try { +if (active) { +ClusterSummary newSummary = getClusterInfoImpl(); +LOG.info("the new summary is {}", newSummary); --- End diff -- This better to be `LOG.debug`? ---
[GitHub] storm issue #2789: STORM-3173: flush metrics to ScheduledReporter on shutdow...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2789 sure. Take your time please. ---
[GitHub] storm issue #2789: STORM-3173: flush metrics to ScheduledReporter on shutdow...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2789 That's right. Will take a look at non-static registry after #2764 and #2764 ---
[GitHub] storm issue #2789: STORM-3173: flush metrics to ScheduledReporter on shutdow...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2789 Tested it manually on a single-node cluster. `nimbus:num-shutdown-calls` was flushed now with this patch. Still need to figure out travis build failure before merging this in. ---
[GitHub] storm issue #2789: STORM-3173: flush metrics to ScheduledReporter on shutdow...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2789 I am also getting ` org.apache.maven.surefire.booter.SurefireBooterForkException: Error occurred in starting fork, check output in log` on my VM. And the tests passed without this patch. Still need to investigate the issue ---
[GitHub] storm pull request #2789: STORM-3173: flush metrics to ScheduledReporter on ...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2789#discussion_r209274375 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -96,4 +99,10 @@ public static void startMetricsReporters(Map topoConf) { throw e; } } + +@FunctionalInterface +public interface Session extends AutoCloseable { --- End diff -- Do we need this? We can have `startMetricsReporters` returns preparableReporters. It will be more clear. But if you prefer to keep this, you might want to pick a better name because this is way to general ---
[GitHub] storm pull request #2789: STORM-3173: flush metrics to ScheduledReporter on ...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2789#discussion_r209269207 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java --- @@ -18,16 +18,12 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.storm.daemon.metrics.ClientMetricsUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class ConsolePreparableReporter implements PreparableReporter { -private static final Logger LOG = LoggerFactory.getLogger(ConsolePreparableReporter.class); -ConsoleReporter reporter = null; +public class ConsolePreparableReporter extends ScheduledPreparableReporter { @Override public void prepare(MetricRegistry metricsRegistry, Map topoConf) { -LOG.debug("Preparing..."); +log.debug("Preparing..."); --- End diff -- `LOG` is used everywhere. I think it's better to use `LOG` here too ---
[GitHub] storm pull request #2789: STORM-3173: flush metrics to ScheduledReporter on ...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2789#discussion_r209274273 --- Diff: storm-server/src/main/java/org/apache/storm/pacemaker/Pacemaker.java --- @@ -49,7 +49,7 @@ public Pacemaker(Map conf) { heartbeats = new ConcurrentHashMap<>(); this.conf = conf; StormMetricsRegistry.registerGauge("pacemaker:size-total-keys", heartbeats::size); -StormMetricsRegistry.startMetricsReporters(conf); + Utils.addShutdownHookWithForceKillIn1Sec(StormMetricsRegistry.startMetricsReporters(conf)::close); --- End diff -- better to break this into two lines ---
[GitHub] storm pull request #2789: STORM-3173: flush metrics to ScheduledReporter on ...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2789#discussion_r209301643 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java --- @@ -316,7 +317,7 @@ public void launchDaemon() { //This will only get updated once StormMetricsRegistry.registerMeter("supervisor:num-launched").mark(); StormMetricsRegistry.registerMeter("supervisor:num-shell-exceptions", ShellUtils.numShellExceptions); -StormMetricsRegistry.startMetricsReporters(conf); +metricsReporters = StormMetricsRegistry.startMetricsReporters(conf); --- End diff -- since you are not really returning metricsReporters here, better not to use this name `metricsReporters` ---
[GitHub] storm pull request #2789: STORM-3173: flush metrics to ScheduledReporter on ...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2789#discussion_r209271682 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/JmxPreparableReporter.java --- @@ -22,9 +22,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class JmxPreparableReporter implements PreparableReporter { +public class JmxPreparableReporter implements PreparableReporter { --- End diff -- why not extending `ScheduledReporter` ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r209070836 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2891,33 +2884,26 @@ public void launchServer() throws Exception { .mapToDouble(SupervisorResources::getTotalCpu) .sum()); StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +//We want to update longest scheduling time in real time in case scheduler get stuck +// It's normal to see some very minor jiggling in value as race condition may happen Long currTime = Time.nanoTime(); -Long startTime = schedulingStartTime.get(); -//There could be race condition here but seems trivial, elapsed is -// guaranteed to be no longer than real elapsed time of scheduling -Long longest = longestSchedulingTime.get(); -if (startTime != null) { -longest = currTime - startTime > longest ? currTime - startTime : longest; -} -//To millis. How should I put the constant for magic numbers? -return longest * 1e-6; +Long startTime = schedulingStartTimeNs.get(); +return TimeUnit.NANOSECONDS.toMillis(startTime == null ? +longestSchedulingTime.get() : Math.max(currTime - startTime, longestSchedulingTime.get())); }); StormMetricsRegistry.registerMeter("nimbus:num-launched").mark(); StormMetricsRegistry.startMetricsReporters(conf); -//IntelliJ suggests clusterConsumerExceutors always non null (unnecessary if statement) -if (clusterConsumerExceutors != null) { -timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)), -() -> { -try { -if (isLeader()) { - sendClusterMetricsToExecutors(); -} -} catch (Exception e) { -throw new RuntimeException(e); +timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)), +() -> { +try { +if (isLeader()) { + sendClusterMetricsToExecutors(); --- End diff -- sorry a mistake ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r209066631 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -267,8 +267,8 @@ private static final Histogram numAddedSlotPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-added-slots-per-scheduling", new ExponentiallyDecayingReservoir()); private static final Histogram numRemovedExecPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-removed-executors-per-scheduling", new ExponentiallyDecayingReservoir()); private static final Histogram numRemovedSlotPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-removed-slots-per-scheduling", new ExponentiallyDecayingReservoir()); -private static final Histogram numNetExecChangePerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-net-executors-changed-per-scheduling", new ExponentiallyDecayingReservoir()); -private static final Histogram numNetSlotChangePerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-net-slots-changed-per-scheduling", new ExponentiallyDecayingReservoir()); +private static final Histogram numNetExecIncreasePerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-net-executors-changed-per-scheduling", new ExponentiallyDecayingReservoir()); +private static final Histogram numNetSlotIncreasePerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-net-slots-changed-per-scheduling", new ExponentiallyDecayingReservoir()); --- End diff -- better to change `nimbus:num-net-slots-changed-per-scheduling` too ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r209067011 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2891,33 +2884,26 @@ public void launchServer() throws Exception { .mapToDouble(SupervisorResources::getTotalCpu) .sum()); StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +//We want to update longest scheduling time in real time in case scheduler get stuck +// It's normal to see some very minor jiggling in value as race condition may happen Long currTime = Time.nanoTime(); -Long startTime = schedulingStartTime.get(); -//There could be race condition here but seems trivial, elapsed is -// guaranteed to be no longer than real elapsed time of scheduling -Long longest = longestSchedulingTime.get(); -if (startTime != null) { -longest = currTime - startTime > longest ? currTime - startTime : longest; -} -//To millis. How should I put the constant for magic numbers? -return longest * 1e-6; +Long startTime = schedulingStartTimeNs.get(); +return TimeUnit.NANOSECONDS.toMillis(startTime == null ? +longestSchedulingTime.get() : Math.max(currTime - startTime, longestSchedulingTime.get())); }); StormMetricsRegistry.registerMeter("nimbus:num-launched").mark(); StormMetricsRegistry.startMetricsReporters(conf); -//IntelliJ suggests clusterConsumerExceutors always non null (unnecessary if statement) -if (clusterConsumerExceutors != null) { -timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)), -() -> { -try { -if (isLeader()) { - sendClusterMetricsToExecutors(); -} -} catch (Exception e) { -throw new RuntimeException(e); +timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)), +() -> { +try { +if (isLeader()) { + sendClusterMetricsToExecutors(); --- End diff -- looks like a typo here? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209016305 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-CPU"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-CPU"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-CPU"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-CPU"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary = new CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) { +@Override +protected ClusterSummary loadValue() { +try { +if (active) { +ClusterSummary newSummary = getClusterInfoImpl(); +LOG.info("the new summary is
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208965124 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} --- End diff -- you can file a separate jira to discuss about it and remove it from this pr ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208987159 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-CPU"); --- End diff -- use lowercase for "CPU" to be consistent with other names? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208984273 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2871,16 +2897,15 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +//Be cautious using method reference instead of lambda. subexpression preceding :: will be evaluated only upon evaluation +// Num supervisor, and fragmented resources have been included in cluster summary + StormMetricsRegistry.registerGauge("nimbus:total-available-memory (nonegative)", () -> nodeIdToResources.get().values() --- End diff -- why do we need `non-negative` ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209007420 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; --- End diff -- this is not used ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208986783 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); --- End diff -- `ported` is not clear out of context of this PR. better to use something like `clusterSummaryMetrics` ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208965019 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2918,6 +2943,10 @@ public void launchServer() throws Exception { } }); } + +//Should we make the delaySecs and recurSecs in sync with any conf value? +// They should be around the reporting interval, but it's not configurable +timer.scheduleRecurring(5, 5, clusterMetricSet); --- End diff -- why do we set to 5 and 5 ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208988762 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-CPU"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-CPU"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-CPU"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-CPU"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol --- End diff -- assertion is good. Just curious in which case this will happen ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208991078 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-CPU"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-CPU"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-CPU"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-CPU"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary = new CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) { +@Override +protected ClusterSummary loadValue() { +try { +if (active) { +ClusterSummary newSummary = getClusterInfoImpl(); +LOG.info("the new summary is
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208986545 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; --- End diff -- Don't need constant since it's only an initial capacity of map ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r209005776 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java --- @@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, String value) throws InvalidR } } +/** + * Find the first N matches of target string in files. + * @param logs all candidate log files to search + * @param numMatches number of matches expected + * @param fileOffset Unclear metrics --- End diff -- we can discuss about it in a separate jira ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208723928 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2219,21 +2288,16 @@ private void mkAssignments(String scratchTopoId) throws Exception { newAssignments.put(topoId, newAssignment); } -if (!newAssignments.equals(existingAssignments)) { +boolean assignmentChanged = inspectSchduling(existingAssignments, newAssignments); +if (assignmentChanged) { LOG.debug("RESETTING id->resources and id->worker-resources cache!"); -LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu()); -nodeIdToResources.get().forEach((id, node) -> -LOG.info( -"Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used " -+ "CPU: {}, Available CPU: {}, fragmented: {}", -id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(), - node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node))); idToResources.set(new HashMap<>()); idToWorkerResources.set(new HashMap<>()); } //tasks figure out what tasks to talk to by looking at topology at runtime // only log/set when there's been a change to the assignment +// TODO: why do we have loop fission here --- End diff -- remove comment ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208707067 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java --- @@ -134,6 +150,7 @@ public Response daemonLog(@Context HttpServletRequest request) throws IOExceptio */ @GET @Path("/searchLogs") +//Seems redundant --- End diff -- remove ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208723798 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2057,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? --- End diff -- remove comment ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208691016 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java --- @@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, String value) throws InvalidR } } +/** + * Find the first N matches of target string in files. + * @param logs all candidate log files to search + * @param numMatches number of matches expected + * @param fileOffset Unclear metrics --- End diff -- comment seems off ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208726809 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java --- @@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, String value) throws InvalidR } } +/** + * Find the first N matches of target string in files. + * @param logs all candidate log files to search + * @param numMatches number of matches expected + * @param fileOffset Unclear metrics + * @param startByteOffset number of byte to be ignored in each log file + * @param targetStr searched string + * @return all matched results + */ @VisibleForTesting -Matched findNMatches(List logs, int numMatches, int fileOffset, int offset, String search) { +Matched findNMatches(List logs, int numMatches, int fileOffset, int startByteOffset, String targetStr) { logs = drop(logs, fileOffset); +LOG.debug("{} files to scan", logs.size()); List> matches = new ArrayList<>(); int matchCount = 0; +int scannedFiles = 0; +//TODO: Unnecessarily convoluted loop that should be optimized while (true) { if (logs.isEmpty()) { break; } File firstLog = logs.get(0); -Map theseMatches; +Map matchInLog; try { LOG.debug("Looking through {}", firstLog); -theseMatches = substringSearch(firstLog, search, numMatches - matchCount, offset); +matchInLog = substringSearch(firstLog, targetStr, numMatches - matchCount, startByteOffset); +scannedFiles++; } catch (InvalidRequestException e) { LOG.error("Can't search past end of file.", e); -theseMatches = new HashMap<>(); +matchInLog = new HashMap<>(); } String fileName = WorkerLogs.getTopologyPortWorkerLog(firstLog); +//This section simply put the formatted log filename and corresponding port in the matching. final List> newMatches = new ArrayList<>(matches); -Map currentFileMatch = new HashMap<>(theseMatches); +Map currentFileMatch = new HashMap<>(matchInLog); currentFileMatch.put("fileName", fileName); Path firstLogAbsPath; try { firstLogAbsPath = firstLog.getCanonicalFile().toPath(); } catch (IOException e) { throw new RuntimeException(e); } +//Why do we need to start from scratch to retrieve just the port here? currentFileMatch.put("port", truncatePathToLastElements(firstLogAbsPath, 2).getName(0).toString()); newMatches.add(currentFileMatch); -int newCount = matchCount + ((List)theseMatches.get("matches")).size(); +int newCount = matchCount + ((List)matchInLog.get("matches")).size(); -//theseMatches is never empty! As guaranteed by the #get().size() method above +//matchInLog is never empty! As guaranteed by the #get().size() method above if (newCount == matchCount) { // matches and matchCount is not changed logs = rest(logs); -offset = 0; +startByteOffset = 0; fileOffset = fileOffset + 1; } else if (newCount >= numMatches) { matches = newMatches; break; } else { matches = newMatches; logs = rest(logs); -offset = 0; +startByteOffset = 0; fileOffset = fileOffset + 1; matchCount = newCount; } } -return new Matched(fileOffset, search, matches); +LOG.info("scanned {} files", scannedFiles); +//fileOffset is not being used and it behaves inconsistently (showing +// (index of files search ends on - 1) if [enough matches] else (index of files search ends on)) +// I don't think we should expose the data to public if it's not used. +// can I dropped this field or change its behavior so it's used for metrics [numScannedFiles]? --- End diff -- You can file a separate jira for this ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208705884 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java --- @@ -55,6 +63,8 @@ public Response downloadFile(String fileName, String user, boolean isDaemon) thr File file = new File(rootDir, fileName).getCanonicalFile(); if (file.exists()) { if (isDaemon || resourceAuthorizer.isUserAllowedToAccessFile(user, fileName)) { +//How should I put the constant for magic numbers? --- End diff -- It's fine here. ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208704852 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java --- @@ -223,8 +246,8 @@ void cleanupEmptyTopoDirectory(File dir) throws IOException { @VisibleForTesting FileFilter mkFileFilterForLogCleanup(long nowMillis) { -final long cutoffAgeMillis = cleanupCutoffAgeMillis(nowMillis); -return file -> !file.isFile() && lastModifiedTimeWorkerLogdir(file) <= cutoffAgeMillis; +//Doesn't it make more sense to do file.isDirectory here? --- End diff -- I agree. Also the name of this method is not clear ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208706459 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java --- @@ -88,9 +87,14 @@ public void setLogFilePermission(String fileName) throws IOException { if (runAsUser && topoOwner.isPresent() && file.exists() && !Files.isReadable(file.toPath())) { LOG.debug("Setting permissions on file {} with topo-owner {}", fileName, topoOwner); -ClientSupervisorUtils.processLauncherAndWait(stormConf, topoOwner.get(), -Lists.newArrayList("blob", file.getCanonicalPath()), null, -"setup group read permissions for file: " + fileName); +try { +ClientSupervisorUtils.processLauncherAndWait(stormConf, topoOwner.get(), +Lists.newArrayList("blob", file.getCanonicalPath()), null, +"setup group read permissions for file: " + fileName); +} catch (IOException e) { +ExceptionMeters.NUM_PERMISSION_EXCEPTIONS.mark(); --- End diff -- The exception name is not clear ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208703040 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java --- @@ -186,7 +192,22 @@ private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set active break; } } +} catch (IOException e) { +ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark(); --- End diff -- better to use `NUM_FILE_SCANNED_EXCEPTIONS`? ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208719630 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +748,99 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.si
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208689498 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java --- @@ -71,6 +78,11 @@ public class LogviewerLogSearchHandler { private static final Logger LOG = LoggerFactory.getLogger(LogviewerLogSearchHandler.class); +private static final Meter numDeepSearchNoResult = StormMetricsRegistry.registerMeter("logviewer:num-deep-search-no-result"); +private static final Histogram numFilesOpenedDeepSearch = StormMetricsRegistry.registerHistogram( +"logviewer:num-files-opened-deep-search", new ExponentiallyDecayingReservoir()); --- End diff -- opened--> scanned? more clear ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208691270 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java --- @@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, String value) throws InvalidR } } +/** + * Find the first N matches of target string in files. + * @param logs all candidate log files to search + * @param numMatches number of matches expected + * @param fileOffset Unclear metrics + * @param startByteOffset number of byte to be ignored in each log file + * @param targetStr searched string + * @return all matched results + */ @VisibleForTesting -Matched findNMatches(List logs, int numMatches, int fileOffset, int offset, String search) { +Matched findNMatches(List logs, int numMatches, int fileOffset, int startByteOffset, String targetStr) { logs = drop(logs, fileOffset); +LOG.debug("{} files to scan", logs.size()); List> matches = new ArrayList<>(); int matchCount = 0; +int scannedFiles = 0; +//TODO: Unnecessarily convoluted loop that should be optimized --- End diff -- remove comment ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208718349 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +748,99 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.si
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208703547 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java --- @@ -95,6 +102,9 @@ public LogCleaner(Map stormConf, WorkerLogs workerLogs, Director LOG.info("configured max total size of worker logs: {} MB, max total size of worker logs per directory: {} MB", maxSumWorkerLogsSizeMb, maxPerWorkerLogsSizeMb); +//Switch to CachedGauge if this starts to hurt performance +// https://stackoverflow.com/questions/5857199/how-to-find-out-the-size-of-file-and-directory-in-java-without-creating-the-obje --- End diff -- remove link ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208651482 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java --- @@ -265,32 +269,26 @@ public Response daemonLogPage(String fileName, Integer start, Integer length, St String rootDir = daemonLogRoot; File file = new File(rootDir, fileName).getCanonicalFile(); String path = file.getCanonicalPath(); -boolean isZipFile = path.endsWith(".gz"); if (file.exists() && new File(rootDir).getCanonicalFile().equals(file.getParentFile())) { // all types of files included List logFiles = Arrays.stream(new File(rootDir).listFiles()) .filter(File::isFile) .collect(toList()); -List filesStrWithoutFileParam = logFiles.stream() -.map(File::getName).filter(fName -> !StringUtils.equals(fileName, fName)).collect(toList()); - -List reorderedFilesStr = new ArrayList<>(); -reorderedFilesStr.addAll(filesStrWithoutFileParam); +List reorderedFilesStr = logFiles.stream() +.map(File::getName).filter(fName -> !StringUtils.equals(fileName, fName)).collect(toList()); reorderedFilesStr.add(fileName); length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE; - -String logString; -if (isTxtFile(fileName)) { -logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); -} else { -logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); +final boolean isZipFile = path.endsWith(".gz"); +long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length(); --- End diff -- why not reuse `getFileLength` ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208377660 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -48,6 +53,30 @@ public static Meter registerMeter(String name) { return REGISTRY.register(name, new Meter()); } +//Change the name to avoid name conflict in future Metrics release --- End diff -- change the name? ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208394271 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java --- @@ -193,24 +198,23 @@ public Response logPage(String fileName, Integer start, Integer length, String g throw e.getCause(); } -List filesStrWithoutFileParam = logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog) -.filter(fileStr -> !StringUtils.equals(fileName, fileStr)).collect(toList()); - -List reorderedFilesStr = new ArrayList<>(); -reorderedFilesStr.addAll(filesStrWithoutFileParam); +List reorderedFilesStr = logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog) +.filter(fileStr -> !StringUtils.equals(fileName, fileStr)).collect(toList()); reorderedFilesStr.add(fileName); length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE; - -String logString; -if (isTxtFile(fileName)) { -logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); -} else { -logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); +//This is the same as what #pageFile(String path, Integer tail) does +// boolean isZipFile = path.endsWith(".gz"); +// long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length(); +// return pageFile(path, Long.valueOf(fileLength - tail).intValue(), tail); --- End diff -- remove comments ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208702901 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java --- @@ -124,6 +128,7 @@ public int deleteOldestWhileTooLarge(List dirs, File file = pq.poll(); stack.push(file); } +LOG.debug("pq: {}, stack: {}", pq, stack); --- End diff -- do we still need this? ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208710506 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2890,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; +} +//To millis. How should I put the constant for magic numbers? --- End diff -- fine here. Or you can use a constant. ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208377360 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java --- @@ -51,6 +51,7 @@ public void run() { private SupervisorWorkerHeartbeats getAndResetWorkerHeartbeats() { Map localHeartbeats; try { +//TODO: This call no longer throws exceptions, do we still want to wrap it in try catch block? --- End diff -- remove comment ---
[GitHub] storm issue #2743: [STORM-3130]: Add Wrappers for Timer registration and tim...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2743 merged in https://github.com/apache/storm/pull/2710 ---
[GitHub] storm issue #2710: STORM-3099: Extend metrics on supervisor, workers and DRP...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2710 @zd-project Could you squash all the commits? Will merge this in. ---
[GitHub] storm issue #2710: STORM-3099: Extend metrics on supervisor, workers and DRP...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2710 @HeartSaVioR @danny0405 Do you want to take a look again? Otherwise I think it's good to merge it. ---
[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2710#discussion_r208340502 --- Diff: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java --- @@ -365,6 +384,7 @@ void addReferencesToBlobs(PortAndAssignment pna, BlobChangingCallback cb) if (!localResourceList.isEmpty()) { getBlobs(localResourceList, pna, cb); } +pna.complete(); --- End diff -- just one more nit: put this outside of this method since the method `addReferencesToBlobs` doesn't indicate `complete` ---
[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2710#discussion_r208279317 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/supervisor/ClientSupervisorUtils.java --- @@ -28,11 +29,14 @@ import org.apache.storm.shade.org.apache.commons.lang.StringUtils; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.ShellUtils; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ClientSupervisorUtils { +public static final Meter numWorkerLaunchExceptions = ShellUtils.numShellExceptions; --- End diff -- I would like to see some comments so people can understand why `ClientSupervisorUtils` is using the same meter with `ShellUtils` ---
[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2710#discussion_r208092337 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java --- @@ -346,6 +358,10 @@ public void cleanUp() throws IOException { _usedMemory.remove(_port); _reservedMemory.remove(_port); cleanUpForRestart(); +} catch (IOException e) { +//This may or may not be reported depending on when process exits --- End diff -- I would like to better understand this. In which case the meter will not be reported? ---
[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2710#discussion_r208095554 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java --- @@ -1008,6 +1015,13 @@ public void close() throws Exception { } static class DynamicState { +private static final Map workerStateTransition = EnumUtil.toEnumMap(MachineState.class, +machineState -> StormMetricsRegistry.registerMeter("supervisor:num-transitions-into-" + machineState.toString())); +private static final Map workerStateDuration = EnumUtil.toEnumMap(MachineState.class, +machineState -> StormMetricsRegistry.registerTimer( +"supervisor:num-transitions-out-" + machineState.toString() + "-and-duration-ms") --- End diff -- It would be good if we can find a better name ---
[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2710#discussion_r208279468 --- Diff: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java --- @@ -251,11 +261,18 @@ private LocalizedResource getUserFile(String user, String key) { long localVersion = blob.getLocalVersion(); long remoteVersion = blob.getRemoteVersion(blobStore); if (localVersion != remoteVersion || !blob.isFullyDownloaded()) { +if (!blob.isFullyDownloaded()) { --- End diff -- Should be `if (!blob.isFullyDownloaded()) {` ---
[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2710#discussion_r208096352 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java --- @@ -1147,8 +1162,19 @@ public DynamicState withPendingLocalization(Future pendingDownload) { */ public DynamicState withState(final MachineState state) { long newStartTime = Time.currentTimeMillis(); +//We may (though unlikely) lose metering here if state transition is too frequent (less than a millisecond) +workerStateDuration.get(this.state).update(newStartTime - startTime, TimeUnit.MILLISECONDS); +workerStateTransition.get(state).mark(); + +LocalAssignment assignment = this.currentAssignment; +if (this.state != MachineState.RUNNING && state == MachineState.RUNNING +&& this.currentAssignment instanceof TimerDecoratedAssignment) { +((TimerDecoratedAssignment) assignment).stopTiming(); +assignment = new LocalAssignment(this.currentAssignment); --- End diff -- A few comments would be helpful. ---
[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2771#discussion_r207665264 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -50,19 +61,19 @@ public static void startMetricsReporters(Map topoConf) { } } -private static T register(final String name, T metric) { -T ret; +@Override +//This is more similar to super#getOrAdd than super#register +public T register(final String name, T metric) throws IllegalArgumentException { --- End diff -- It is fine. But I would like to see more javadoc for this method. ---
[GitHub] storm pull request #2771: STORM-3157: General improvement to StormMetricsReg...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2771#discussion_r207594931 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -50,19 +61,19 @@ public static void startMetricsReporters(Map topoConf) { } } -private static T register(final String name, T metric) { -T ret; +@Override +//This is more similar to super#getOrAdd than super#register +public T register(final String name, T metric) throws IllegalArgumentException { --- End diff -- if the metric is MetricSet, it will be having the same issue with the old code. ---
[GitHub] storm pull request #2763: STORM-3150: Improve Gauge registration methods and...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2763#discussion_r205500685 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -27,52 +28,63 @@ @SuppressWarnings("unchecked") public class StormMetricsRegistry { -public static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry(); +private static final MetricRegistry DEFAULT_REGISTRY = new MetricRegistry(); private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class); -public static Meter registerMeter(String name) { -Meter meter = new Meter(); -return register(name, meter); +public static Meter registerMeter(final String name) { +return register(name, new Meter()); } -// TODO: should replace Callable to Gauge when nimbus.clj is translated to java -public static Gauge registerGauge(final String name, final Callable fn) { -Gauge gauge = new Gauge() { -@Override -public Integer getValue() { -try { -return (Integer) fn.call(); -} catch (Exception e) { -LOG.error("Error getting gauge value for {}", name, e); -} -return 0; +/** + * Register a gauge with provided callback. + * @param name name of the gauge + * @param fn callback that measures + * @param type of measurement the callback returns, also the type of gauge + * @return registered gauge + */ +public static Gauge registerGauge(final String name, final Callable fn) { +return register(name, () -> { +try { +return fn.call(); +} catch (Exception e) { +LOG.error("Error getting gauge value for {}", name, e); } -}; -return register(name, gauge); +return null; +}); } -public static void registerProvidedGauge(final String name, Gauge gauge) { +/** + * Register a provided gauge. Use this method if custom gauges is needed or + * no checked exceptions should be handled. + * @param name name of the gauge + * @param gauge gauge + * @param type of value the gauge measures + */ +public static void registerProvidedGauge(final String name, final Gauge gauge) { register(name, gauge); } public static Histogram registerHistogram(String name, Reservoir reservoir) { -Histogram histogram = new Histogram(reservoir); -return register(name, histogram); +return register(name, new Histogram(reservoir)); +} + +public static void registerAll(final String prefix, MetricSet metrics) { --- End diff -- I like the idea of having `registerMetricSet` and `unregisterMetricSet` in #2771 better. But to make the commit history easier to understand, I would suggest one of the following options: 1. put the changes to the patch where is using these methods 2. Port the changes in #2771 about `registerMetricSet` and `unregisterMetricSet` here. This is to avoid to change the same methods multiple times. I prefer option 1. ---
[GitHub] storm issue #2714: STORM-3101: Fix unexpected metrics registration in StormM...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2714 Talked with @zd-project offline, I now have a better understand of the problem he is trying to solve as explained in https://issues.apache.org/jira/browse/STORM-3101. It's a really good catch. But we can discuss more about how to address it. ---
[GitHub] storm issue #2714: STORM-3101: Add filter to StormMetricsRegistry.
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2714 Thanks for the contribution. It's good to exploit more on metrics functionality. I don't see the real need for adding filters since it's all storm internal code. But if you think it's better to have, I would suggest to use a filter layer to make this more flexible. For example, Create a filter interface with a filter function. e.g. ``` public interface IStormMetricsRegistryFilter { public default boolean filter(String metricName) { return false; } } ``` then you can have a function (e.g. addFilter) in StormMetricsRegistryFilter to add real implementation of the filter interface before StormMetricsRegistry starts to registerMeters(). The above is a very simple interface and might not be able to do much except filtering based on the `String` parameter. You can think about more on this. I think the current implementation in this PR won't work because you are calling `setsource()` to late ---
[GitHub] storm issue #2718: STORM-3103 allow nimbus to shutdown properly
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2718 The code change makes sense to me. Just wonder if the whole JVM is terminated immediately after exitProcess() is being called. ---
[GitHub] storm issue #2700: [STORM-3093] Cache the storm id to executors mapping on m...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2700 @danny0405 No it's fine. Thanks for the contribution. ---
[GitHub] storm issue #2700: [STORM-3093] Cache the storm id to executors mapping on m...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2700 Looks good. Do you have any performance test results to share so that we can have an idea how much performance gain this patch gets? It's fine if you don't have it ---