[GitHub] storm pull request #2882: STORM-3260: Add in support to print some state

2018-10-17 Thread revans2
GitHub user revans2 opened a pull request:

https://github.com/apache/storm/pull/2882

STORM-3260: Add in support to print some state



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/revans2/incubator-storm STORM-3260

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2882.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2882


commit 6828ecae38c79522c3b7c9ab590d54ececf6d4c3
Author: Robert (Bobby) Evans 
Date:   2018-10-17T22:02:40Z

STORM-3260: Add in support to print some state




---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-17 Thread govind-menon
Github user govind-menon commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r226082482
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -121,11 +121,68 @@
 private static String memoizedLocalHostnameString = null;
 public static final Pattern TOPOLOGY_KEY_PATTERN = 
Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
 
+public static final String NUMA_MEMORY_IN_MB = "MemoryInMB";
+public static final String NUMA_CORES = "Cores";
+public static final String NUMAS_PORTS = "Ports";
+public static final String NUMA_ID = "Id";
+public static final String NUMAS_BASE = "Numas";
+
 static {
 localConf = readStormConfig();
 serializationDelegate = getSerializationDelegate(localConf);
 }
 
+/**
+ * Validate supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getValidatedNumaMap
+ * @throws KeyNotFoundException
+ */
+public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException {
+Map validatedNumaMap = new HashMap();
+Map supervisorNumaMap = (Map) 
stormConf.get(Config.SUPERVISOR_NUMA_META);
+if (supervisorNumaMap == null) return validatedNumaMap;
+if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
+throw new KeyNotFoundException("The numa configurations [" + 
NUMAS_BASE + "] is missing!");
+}
+List numaEntries = (List) 
supervisorNumaMap.get(NUMAS_BASE);
+if (numaEntries == null) return validatedNumaMap;
+for (Map numa : numaEntries) {
+for (String key : new String[]{NUMA_ID, NUMA_CORES, 
NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
+if (!numa.containsKey(key)) {
+throw new KeyNotFoundException("The numa configuration 
key [" + key + "] is missing!");
+}
+}
+validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
+}
+return validatedNumaMap;
+}
+
+/**
+ * getNumaIdForPort.
+ * @param port port
+ * @param stormConf stormConf
+ * @return getNumaIdForPort
+ * @throws KeyNotFoundException
+ */
+public static String getNumaIdForPort(Number port, Map 
stormConf) {
+Map validatedNumaMap = null;
+try {
+validatedNumaMap = getValidatedNumaMap(stormConf);
+} catch (KeyNotFoundException e) {
+LOG.error("Exception while getting NUMA config", e);
+return null;
+}
+for (Object numaEntry : validatedNumaMap.values()) {
+Map numaMap  = (Map) numaEntry;
+List portList = (List) 
numaMap.get(NUMAS_PORTS);
+if (portList.contains(port)) {
--- End diff --

Yes - you could have a worker that's not part of a NUMA supervisor in which 
case the numaID should be null and therefore it would be launched without 
pinning.


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-17 Thread govind-menon
Github user govind-menon commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r226082082
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
@@ -630,7 +632,7 @@ public boolean areAllConnectionsReady() {
 Map, NodeInfo> executorToNodePort = 
getLocalAssignment(conf, stormClusterState, 
topologyId).get_executor_node_port();
 for (Map.Entry, NodeInfo> entry : 
executorToNodePort.entrySet()) {
 NodeInfo nodeInfo = entry.getValue();
-if (nodeInfo.get_node().equals(assignmentId) && 
nodeInfo.get_port().iterator().next() == port) {
+if (nodeInfo.get_node().startsWith(assignmentId) && 
nodeInfo.get_port().iterator().next() == port) {
--- End diff --

It's an ID that links supervisors to their assignments - since Nimbus 
thinks there are multiple NUMA supervisors the real supervisor will now read 
all assignments linked to them


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-17 Thread govind-menon
Github user govind-menon commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r226081834
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -121,11 +121,68 @@
 private static String memoizedLocalHostnameString = null;
 public static final Pattern TOPOLOGY_KEY_PATTERN = 
Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
 
+public static final String NUMA_MEMORY_IN_MB = "MemoryInMB";
+public static final String NUMA_CORES = "Cores";
+public static final String NUMAS_PORTS = "Ports";
+public static final String NUMA_ID = "Id";
+public static final String NUMAS_BASE = "Numas";
+
 static {
 localConf = readStormConfig();
 serializationDelegate = getSerializationDelegate(localConf);
 }
 
+/**
+ * Validate supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getValidatedNumaMap
+ * @throws KeyNotFoundException
+ */
+public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException {
+Map validatedNumaMap = new HashMap();
+Map supervisorNumaMap = (Map) 
stormConf.get(Config.SUPERVISOR_NUMA_META);
+if (supervisorNumaMap == null) return validatedNumaMap;
+if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
+throw new KeyNotFoundException("The numa configurations [" + 
NUMAS_BASE + "] is missing!");
+}
+List numaEntries = (List) 
supervisorNumaMap.get(NUMAS_BASE);
+if (numaEntries == null) return validatedNumaMap;
+for (Map numa : numaEntries) {
+for (String key : new String[]{NUMA_ID, NUMA_CORES, 
NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
+if (!numa.containsKey(key)) {
+throw new KeyNotFoundException("The numa configuration 
key [" + key + "] is missing!");
+}
+}
+validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
+}
+return validatedNumaMap;
+}
+
+/**
+ * getNumaIdForPort.
+ * @param port port
+ * @param stormConf stormConf
+ * @return getNumaIdForPort
+ * @throws KeyNotFoundException
+ */
+public static String getNumaIdForPort(Number port, Map 
stormConf) {
+Map validatedNumaMap = null;
+try {
+validatedNumaMap = getValidatedNumaMap(stormConf);
+} catch (KeyNotFoundException e) {
+LOG.error("Exception while getting NUMA config", e);
+return null;
--- End diff --

Not necessarily - it just means that the NUMA config is improperly formed 
or missing - the latter would be the default in normal clusters


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-17 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r226060529
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
@@ -630,7 +632,7 @@ public boolean areAllConnectionsReady() {
 Map, NodeInfo> executorToNodePort = 
getLocalAssignment(conf, stormClusterState, 
topologyId).get_executor_node_port();
 for (Map.Entry, NodeInfo> entry : 
executorToNodePort.entrySet()) {
 NodeInfo nodeInfo = entry.getValue();
-if (nodeInfo.get_node().equals(assignmentId) && 
nodeInfo.get_port().iterator().next() == port) {
+if (nodeInfo.get_node().startsWith(assignmentId) && 
nodeInfo.get_port().iterator().next() == port) {
--- End diff --

what will assignmentId be?  Why does this change?


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-17 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r226065664
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -121,11 +121,68 @@
 private static String memoizedLocalHostnameString = null;
 public static final Pattern TOPOLOGY_KEY_PATTERN = 
Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
 
+public static final String NUMA_MEMORY_IN_MB = "MemoryInMB";
+public static final String NUMA_CORES = "Cores";
+public static final String NUMAS_PORTS = "Ports";
+public static final String NUMA_ID = "Id";
+public static final String NUMAS_BASE = "Numas";
+
 static {
 localConf = readStormConfig();
 serializationDelegate = getSerializationDelegate(localConf);
 }
 
+/**
+ * Validate supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getValidatedNumaMap
+ * @throws KeyNotFoundException
+ */
+public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException {
+Map validatedNumaMap = new HashMap();
+Map supervisorNumaMap = (Map) 
stormConf.get(Config.SUPERVISOR_NUMA_META);
+if (supervisorNumaMap == null) return validatedNumaMap;
+if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
+throw new KeyNotFoundException("The numa configurations [" + 
NUMAS_BASE + "] is missing!");
+}
+List numaEntries = (List) 
supervisorNumaMap.get(NUMAS_BASE);
+if (numaEntries == null) return validatedNumaMap;
+for (Map numa : numaEntries) {
+for (String key : new String[]{NUMA_ID, NUMA_CORES, 
NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
+if (!numa.containsKey(key)) {
+throw new KeyNotFoundException("The numa configuration 
key [" + key + "] is missing!");
+}
+}
+validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
+}
+return validatedNumaMap;
+}
+
+/**
+ * getNumaIdForPort.
+ * @param port port
+ * @param stormConf stormConf
+ * @return getNumaIdForPort
+ * @throws KeyNotFoundException
+ */
+public static String getNumaIdForPort(Number port, Map 
stormConf) {
+Map validatedNumaMap = null;
+try {
+validatedNumaMap = getValidatedNumaMap(stormConf);
+} catch (KeyNotFoundException e) {
+LOG.error("Exception while getting NUMA config", e);
+return null;
+}
+for (Object numaEntry : validatedNumaMap.values()) {
+Map numaMap  = (Map) numaEntry;
+List portList = (List) 
numaMap.get(NUMAS_PORTS);
+if (portList.contains(port)) {
--- End diff --

does it make sense to have a NUMA map and not be able to find a port?  
Should this be an error?


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-17 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r226065179
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -121,11 +121,68 @@
 private static String memoizedLocalHostnameString = null;
 public static final Pattern TOPOLOGY_KEY_PATTERN = 
Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
 
+public static final String NUMA_MEMORY_IN_MB = "MemoryInMB";
+public static final String NUMA_CORES = "Cores";
+public static final String NUMAS_PORTS = "Ports";
+public static final String NUMA_ID = "Id";
+public static final String NUMAS_BASE = "Numas";
+
 static {
 localConf = readStormConfig();
 serializationDelegate = getSerializationDelegate(localConf);
 }
 
+/**
+ * Validate supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getValidatedNumaMap
+ * @throws KeyNotFoundException
+ */
+public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException {
+Map validatedNumaMap = new HashMap();
+Map supervisorNumaMap = (Map) 
stormConf.get(Config.SUPERVISOR_NUMA_META);
+if (supervisorNumaMap == null) return validatedNumaMap;
+if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
+throw new KeyNotFoundException("The numa configurations [" + 
NUMAS_BASE + "] is missing!");
+}
+List numaEntries = (List) 
supervisorNumaMap.get(NUMAS_BASE);
+if (numaEntries == null) return validatedNumaMap;
+for (Map numa : numaEntries) {
+for (String key : new String[]{NUMA_ID, NUMA_CORES, 
NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
+if (!numa.containsKey(key)) {
+throw new KeyNotFoundException("The numa configuration 
key [" + key + "] is missing!");
+}
+}
+validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
+}
+return validatedNumaMap;
+}
+
+/**
+ * getNumaIdForPort.
+ * @param port port
+ * @param stormConf stormConf
+ * @return getNumaIdForPort
+ * @throws KeyNotFoundException
+ */
+public static String getNumaIdForPort(Number port, Map 
stormConf) {
+Map validatedNumaMap = null;
+try {
+validatedNumaMap = getValidatedNumaMap(stormConf);
+} catch (KeyNotFoundException e) {
+LOG.error("Exception while getting NUMA config", e);
+return null;
--- End diff --

This sounds like it should be a serious misconfig.  I would think we should 
throw an exception?


---


[GitHub] storm issue #2881: STORM-3259: NUMA Support for Storm

2018-10-17 Thread govind-menon
Github user govind-menon commented on the issue:

https://github.com/apache/storm/pull/2881
  
Includes work by @kishorvpatil 


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-17 Thread govind-menon
GitHub user govind-menon opened a pull request:

https://github.com/apache/storm/pull/2881

STORM-3259: NUMA Support for Storm

Only functional changes - putting up for review now and tests soon to 
follow.

Have Done the following tests

1. Mixed cluster - numa supervisors and normal supervisor
2. Numa only supervisors
3. Profiling actions
4. Rebalance

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/govind-menon/storm STORM-3259

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2881.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2881


commit 243594eb14301a3367ddf6b0574afb13898cc4af
Author: Govind Menon 
Date:   2018-10-07T20:11:43Z

STORM-3259: NUMA Support for Storm

Supervisor will heartbeat as multiple supervisors if there's an appropriate 
numa config

[Numa] Fixiing worker sync assignments

[numa] Moving NUMA config validation to common Utils




---