[GitHub] storm pull request #2882: STORM-3260: Add in support to print some state
GitHub user revans2 opened a pull request: https://github.com/apache/storm/pull/2882 STORM-3260: Add in support to print some state You can merge this pull request into a Git repository by running: $ git pull https://github.com/revans2/incubator-storm STORM-3260 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2882.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2882 commit 6828ecae38c79522c3b7c9ab590d54ececf6d4c3 Author: Robert (Bobby) Evans Date: 2018-10-17T22:02:40Z STORM-3260: Add in support to print some state ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user govind-menon commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r226082482 --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java --- @@ -121,11 +121,68 @@ private static String memoizedLocalHostnameString = null; public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS); +public static final String NUMA_MEMORY_IN_MB = "MemoryInMB"; +public static final String NUMA_CORES = "Cores"; +public static final String NUMAS_PORTS = "Ports"; +public static final String NUMA_ID = "Id"; +public static final String NUMAS_BASE = "Numas"; + static { localConf = readStormConfig(); serializationDelegate = getSerializationDelegate(localConf); } +/** + * Validate supervisor numa configuration. + * @param stormConf stormConf + * @return getValidatedNumaMap + * @throws KeyNotFoundException + */ +public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException { +Map validatedNumaMap = new HashMap(); +Map supervisorNumaMap = (Map) stormConf.get(Config.SUPERVISOR_NUMA_META); +if (supervisorNumaMap == null) return validatedNumaMap; +if (!supervisorNumaMap.containsKey(NUMAS_BASE)) { +throw new KeyNotFoundException("The numa configurations [" + NUMAS_BASE + "] is missing!"); +} +List numaEntries = (List) supervisorNumaMap.get(NUMAS_BASE); +if (numaEntries == null) return validatedNumaMap; +for (Map numa : numaEntries) { +for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) { +if (!numa.containsKey(key)) { +throw new KeyNotFoundException("The numa configuration key [" + key + "] is missing!"); +} +} +validatedNumaMap.put((String) numa.get(NUMA_ID), numa); +} +return validatedNumaMap; +} + +/** + * getNumaIdForPort. + * @param port port + * @param stormConf stormConf + * @return getNumaIdForPort + * @throws KeyNotFoundException + */ +public static String getNumaIdForPort(Number port, Map stormConf) { +Map validatedNumaMap = null; +try { +validatedNumaMap = getValidatedNumaMap(stormConf); +} catch (KeyNotFoundException e) { +LOG.error("Exception while getting NUMA config", e); +return null; +} +for (Object numaEntry : validatedNumaMap.values()) { +Map numaMap = (Map) numaEntry; +List portList = (List) numaMap.get(NUMAS_PORTS); +if (portList.contains(port)) { --- End diff -- Yes - you could have a worker that's not part of a NUMA supervisor in which case the numaID should be null and therefore it would be launched without pinning. ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user govind-menon commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r226082082 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java --- @@ -630,7 +632,7 @@ public boolean areAllConnectionsReady() { Map, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port(); for (Map.Entry, NodeInfo> entry : executorToNodePort.entrySet()) { NodeInfo nodeInfo = entry.getValue(); -if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) { +if (nodeInfo.get_node().startsWith(assignmentId) && nodeInfo.get_port().iterator().next() == port) { --- End diff -- It's an ID that links supervisors to their assignments - since Nimbus thinks there are multiple NUMA supervisors the real supervisor will now read all assignments linked to them ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user govind-menon commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r226081834 --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java --- @@ -121,11 +121,68 @@ private static String memoizedLocalHostnameString = null; public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS); +public static final String NUMA_MEMORY_IN_MB = "MemoryInMB"; +public static final String NUMA_CORES = "Cores"; +public static final String NUMAS_PORTS = "Ports"; +public static final String NUMA_ID = "Id"; +public static final String NUMAS_BASE = "Numas"; + static { localConf = readStormConfig(); serializationDelegate = getSerializationDelegate(localConf); } +/** + * Validate supervisor numa configuration. + * @param stormConf stormConf + * @return getValidatedNumaMap + * @throws KeyNotFoundException + */ +public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException { +Map validatedNumaMap = new HashMap(); +Map supervisorNumaMap = (Map) stormConf.get(Config.SUPERVISOR_NUMA_META); +if (supervisorNumaMap == null) return validatedNumaMap; +if (!supervisorNumaMap.containsKey(NUMAS_BASE)) { +throw new KeyNotFoundException("The numa configurations [" + NUMAS_BASE + "] is missing!"); +} +List numaEntries = (List) supervisorNumaMap.get(NUMAS_BASE); +if (numaEntries == null) return validatedNumaMap; +for (Map numa : numaEntries) { +for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) { +if (!numa.containsKey(key)) { +throw new KeyNotFoundException("The numa configuration key [" + key + "] is missing!"); +} +} +validatedNumaMap.put((String) numa.get(NUMA_ID), numa); +} +return validatedNumaMap; +} + +/** + * getNumaIdForPort. + * @param port port + * @param stormConf stormConf + * @return getNumaIdForPort + * @throws KeyNotFoundException + */ +public static String getNumaIdForPort(Number port, Map stormConf) { +Map validatedNumaMap = null; +try { +validatedNumaMap = getValidatedNumaMap(stormConf); +} catch (KeyNotFoundException e) { +LOG.error("Exception while getting NUMA config", e); +return null; --- End diff -- Not necessarily - it just means that the NUMA config is improperly formed or missing - the latter would be the default in normal clusters ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r226060529 --- Diff: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java --- @@ -630,7 +632,7 @@ public boolean areAllConnectionsReady() { Map, NodeInfo> executorToNodePort = getLocalAssignment(conf, stormClusterState, topologyId).get_executor_node_port(); for (Map.Entry, NodeInfo> entry : executorToNodePort.entrySet()) { NodeInfo nodeInfo = entry.getValue(); -if (nodeInfo.get_node().equals(assignmentId) && nodeInfo.get_port().iterator().next() == port) { +if (nodeInfo.get_node().startsWith(assignmentId) && nodeInfo.get_port().iterator().next() == port) { --- End diff -- what will assignmentId be? Why does this change? ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r226065664 --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java --- @@ -121,11 +121,68 @@ private static String memoizedLocalHostnameString = null; public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS); +public static final String NUMA_MEMORY_IN_MB = "MemoryInMB"; +public static final String NUMA_CORES = "Cores"; +public static final String NUMAS_PORTS = "Ports"; +public static final String NUMA_ID = "Id"; +public static final String NUMAS_BASE = "Numas"; + static { localConf = readStormConfig(); serializationDelegate = getSerializationDelegate(localConf); } +/** + * Validate supervisor numa configuration. + * @param stormConf stormConf + * @return getValidatedNumaMap + * @throws KeyNotFoundException + */ +public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException { +Map validatedNumaMap = new HashMap(); +Map supervisorNumaMap = (Map) stormConf.get(Config.SUPERVISOR_NUMA_META); +if (supervisorNumaMap == null) return validatedNumaMap; +if (!supervisorNumaMap.containsKey(NUMAS_BASE)) { +throw new KeyNotFoundException("The numa configurations [" + NUMAS_BASE + "] is missing!"); +} +List numaEntries = (List) supervisorNumaMap.get(NUMAS_BASE); +if (numaEntries == null) return validatedNumaMap; +for (Map numa : numaEntries) { +for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) { +if (!numa.containsKey(key)) { +throw new KeyNotFoundException("The numa configuration key [" + key + "] is missing!"); +} +} +validatedNumaMap.put((String) numa.get(NUMA_ID), numa); +} +return validatedNumaMap; +} + +/** + * getNumaIdForPort. + * @param port port + * @param stormConf stormConf + * @return getNumaIdForPort + * @throws KeyNotFoundException + */ +public static String getNumaIdForPort(Number port, Map stormConf) { +Map validatedNumaMap = null; +try { +validatedNumaMap = getValidatedNumaMap(stormConf); +} catch (KeyNotFoundException e) { +LOG.error("Exception while getting NUMA config", e); +return null; +} +for (Object numaEntry : validatedNumaMap.values()) { +Map numaMap = (Map) numaEntry; +List portList = (List) numaMap.get(NUMAS_PORTS); +if (portList.contains(port)) { --- End diff -- does it make sense to have a NUMA map and not be able to find a port? Should this be an error? ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2881#discussion_r226065179 --- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java --- @@ -121,11 +121,68 @@ private static String memoizedLocalHostnameString = null; public static final Pattern TOPOLOGY_KEY_PATTERN = Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS); +public static final String NUMA_MEMORY_IN_MB = "MemoryInMB"; +public static final String NUMA_CORES = "Cores"; +public static final String NUMAS_PORTS = "Ports"; +public static final String NUMA_ID = "Id"; +public static final String NUMAS_BASE = "Numas"; + static { localConf = readStormConfig(); serializationDelegate = getSerializationDelegate(localConf); } +/** + * Validate supervisor numa configuration. + * @param stormConf stormConf + * @return getValidatedNumaMap + * @throws KeyNotFoundException + */ +public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException { +Map validatedNumaMap = new HashMap(); +Map supervisorNumaMap = (Map) stormConf.get(Config.SUPERVISOR_NUMA_META); +if (supervisorNumaMap == null) return validatedNumaMap; +if (!supervisorNumaMap.containsKey(NUMAS_BASE)) { +throw new KeyNotFoundException("The numa configurations [" + NUMAS_BASE + "] is missing!"); +} +List numaEntries = (List) supervisorNumaMap.get(NUMAS_BASE); +if (numaEntries == null) return validatedNumaMap; +for (Map numa : numaEntries) { +for (String key : new String[]{NUMA_ID, NUMA_CORES, NUMA_MEMORY_IN_MB, NUMAS_PORTS}) { +if (!numa.containsKey(key)) { +throw new KeyNotFoundException("The numa configuration key [" + key + "] is missing!"); +} +} +validatedNumaMap.put((String) numa.get(NUMA_ID), numa); +} +return validatedNumaMap; +} + +/** + * getNumaIdForPort. + * @param port port + * @param stormConf stormConf + * @return getNumaIdForPort + * @throws KeyNotFoundException + */ +public static String getNumaIdForPort(Number port, Map stormConf) { +Map validatedNumaMap = null; +try { +validatedNumaMap = getValidatedNumaMap(stormConf); +} catch (KeyNotFoundException e) { +LOG.error("Exception while getting NUMA config", e); +return null; --- End diff -- This sounds like it should be a serious misconfig. I would think we should throw an exception? ---
[GitHub] storm issue #2881: STORM-3259: NUMA Support for Storm
Github user govind-menon commented on the issue: https://github.com/apache/storm/pull/2881 Includes work by @kishorvpatil ---
[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm
GitHub user govind-menon opened a pull request: https://github.com/apache/storm/pull/2881 STORM-3259: NUMA Support for Storm Only functional changes - putting up for review now and tests soon to follow. Have Done the following tests 1. Mixed cluster - numa supervisors and normal supervisor 2. Numa only supervisors 3. Profiling actions 4. Rebalance You can merge this pull request into a Git repository by running: $ git pull https://github.com/govind-menon/storm STORM-3259 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2881.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2881 commit 243594eb14301a3367ddf6b0574afb13898cc4af Author: Govind Menon Date: 2018-10-07T20:11:43Z STORM-3259: NUMA Support for Storm Supervisor will heartbeat as multiple supervisors if there's an appropriate numa config [Numa] Fixiing worker sync assignments [numa] Moving NUMA config validation to common Utils ---