[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228300609
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -121,11 +121,68 @@
 private static String memoizedLocalHostnameString = null;
 public static final Pattern TOPOLOGY_KEY_PATTERN = 
Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
 
+public static final String NUMA_MEMORY_IN_MB = "MemoryInMB";
+public static final String NUMA_CORES = "Cores";
+public static final String NUMAS_PORTS = "Ports";
+public static final String NUMA_ID = "Id";
+public static final String NUMAS_BASE = "Numas";
+
 static {
 localConf = readStormConfig();
 serializationDelegate = getSerializationDelegate(localConf);
 }
 
+/**
+ * Validate supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getValidatedNumaMap
+ * @throws KeyNotFoundException
+ */
+public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException {
+Map validatedNumaMap = new HashMap();
+Map supervisorNumaMap = (Map) 
stormConf.get(Config.SUPERVISOR_NUMA_META);
+if (supervisorNumaMap == null) return validatedNumaMap;
+if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
+throw new KeyNotFoundException("The numa configurations [" + 
NUMAS_BASE + "] is missing!");
+}
+List numaEntries = (List) 
supervisorNumaMap.get(NUMAS_BASE);
+if (numaEntries == null) return validatedNumaMap;
+for (Map numa : numaEntries) {
+for (String key : new String[]{NUMA_ID, NUMA_CORES, 
NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
+if (!numa.containsKey(key)) {
+throw new KeyNotFoundException("The numa configuration 
key [" + key + "] is missing!");
+}
+}
+validatedNumaMap.put((String) numa.get(NUMA_ID), numa);
+}
+return validatedNumaMap;
+}
+
+/**
+ * getNumaIdForPort.
+ * @param port port
+ * @param stormConf stormConf
+ * @return getNumaIdForPort
+ * @throws KeyNotFoundException
+ */
+public static String getNumaIdForPort(Number port, Map 
stormConf) {
+Map validatedNumaMap = null;
+try {
+validatedNumaMap = getValidatedNumaMap(stormConf);
+} catch (KeyNotFoundException e) {
+LOG.error("Exception while getting NUMA config", e);
+return null;
--- End diff --

I agree that we should throw an exception if it's misconfigured. If numa is 
configured, I think we want to make sure it's working properly or fail fast if 
misconfigured. Storm admins might not notice there is an "ERROR" message at the 
first place and wondering why storm is  not working as expected (after a 
relatively long time). If they don't want numa, they will not set numa configs. 


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228303554
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---
@@ -87,7 +87,7 @@ public Slot(AsyncLocalizer localizer, Map 
conf,
 AtomicReference> 
cachedCurrentAssignments,
 OnlyLatestExecutor metricsExec,
 WorkerMetricsProcessor metricsProcessor,
-SlotMetrics slotMetrics) throws Exception {
+SlotMetrics slotMetrics, String numaId) throws Exception {
--- End diff --

Can we remove this? It looks like it's not used anywhere


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228307991
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -119,15 +120,81 @@
 // tests by subclassing.
 private static Utils _instance = new Utils();
 private static String memoizedLocalHostnameString = null;
-public static final Pattern TOPOLOGY_KEY_PATTERN = 
Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
+public static final Pattern TOPOLOGY_KEY_PATTERN =
+Pattern.compile("^[\\w \\t\\._-]+$", 
Pattern.UNICODE_CHARACTER_CLASS);
+
+public static final String NUMA_MEMORY_IN_MB = "memory.mb";
+public static final String NUMA_CORES = "cores";
+public static final String NUMAS_PORTS = "ports";
+public static final String NUMA_ID = "node_id";
+public static final String NUMAS_BASE = "numas";
 
 static {
 localConf = readStormConfig();
 serializationDelegate = getSerializationDelegate(localConf);
 }
 
 /**
- * Provide an instance of this class for delegates to use.  To mock 
out delegated methods, provide an instance of a subclass that
+ * Validate supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getValidatedNumaMap
+ * @throws KeyNotFoundException
+ */
+public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException {
+Map validatedNumaMap = new HashMap();
+Map supervisorNumaMap =
+(Map) 
stormConf.get(Config.SUPERVISOR_NUMA_META);
+if (supervisorNumaMap == null) {
+return validatedNumaMap;
+}
+if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
+throw new KeyNotFoundException(
+"The numa configurations [" + NUMAS_BASE + "] is 
missing!"
+);
+}
+List numaEntries = (List) 
supervisorNumaMap.get(NUMAS_BASE);
+if (numaEntries == null) return validatedNumaMap;
+for (Map numa : numaEntries) {
+for (String key : new String[]{NUMA_ID, NUMA_CORES, 
NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
+if (!numa.containsKey(key)) {
--- End diff --

Since we calculate cpu capacity based on the size of `NUMA_CORES`, do we 
need to validate that there is no dups in `NUMA_CORES` ? For example, what will 
happen if `NUMA_CORES` is `[0, 1, 1, 3]`?

Another example: 
```
{"node_id": 0, "memory.mb": 122880, "cores": [ 0, 1], "ports": [6700, 
6701]},
{"node_id": 1, "memory.mb": 122880, "cores": [ 0, 1], "ports": [6702, 6703]}
```


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228298411
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -119,15 +120,81 @@
 // tests by subclassing.
 private static Utils _instance = new Utils();
 private static String memoizedLocalHostnameString = null;
-public static final Pattern TOPOLOGY_KEY_PATTERN = 
Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
+public static final Pattern TOPOLOGY_KEY_PATTERN =
+Pattern.compile("^[\\w \\t\\._-]+$", 
Pattern.UNICODE_CHARACTER_CLASS);
+
+public static final String NUMA_MEMORY_IN_MB = "memory.mb";
+public static final String NUMA_CORES = "cores";
+public static final String NUMAS_PORTS = "ports";
+public static final String NUMA_ID = "node_id";
+public static final String NUMAS_BASE = "numas";
 
 static {
 localConf = readStormConfig();
 serializationDelegate = getSerializationDelegate(localConf);
 }
 
 /**
- * Provide an instance of this class for delegates to use.  To mock 
out delegated methods, provide an instance of a subclass that
+ * Validate supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getValidatedNumaMap
+ * @throws KeyNotFoundException
+ */
+public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException {
+Map validatedNumaMap = new HashMap();
+Map supervisorNumaMap =
+(Map) 
stormConf.get(Config.SUPERVISOR_NUMA_META);
+if (supervisorNumaMap == null) {
+return validatedNumaMap;
+}
+if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
+throw new KeyNotFoundException(
--- End diff --

I think `KeyNotFoundException` is mostly about `blobstore`. And I don't 
think we should use it here. I think it's more like a `IllegalArgumentException`


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228321490
  
--- Diff: 
storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
 ---
@@ -319,7 +319,15 @@ public void testLaunch() throws Exception {
 superConf.put(Config.STORM_WORKERS_ARTIFACTS_DIR, stormLocal);
 superConf.put(DaemonConfig.STORM_LOG4J2_CONF_DIR, log4jdir);
 superConf.put(Config.WORKER_CHILDOPTS, " -Dtesting=true");
-
+Map numaNode = new HashMap();
+numaNode.put(Utils.NUMA_ID, "0");
+numaNode.put(Utils.NUMA_CORES, Collections.singletonList("0"));
+numaNode.put(Utils.NUMAS_PORTS, Collections.singletonList(8081));
+numaNode.put(Utils.NUMA_MEMORY_IN_MB, 2048);
+Map numaMap = new HashMap();
+numaMap.put(Utils.NUMAS_BASE, Collections.singletonList(numaNode));
+
+superConf.put(Config.SUPERVISOR_NUMA_META, numaMap);
--- End diff --

Remove this  since `testNumaPinnedLaunch()` is available  ?


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228319880
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/normalization/NormalizedResourceOffer.java
 ---
@@ -81,6 +81,20 @@ public void add(NormalizedResourcesWithMemory other) {
 totalMemoryMb += other.getTotalMemoryMb();
 }
 
+/**
+ * Remove the resources in other from this.
+ * @param other the resources to be removed.
+ * @return true if one or more resources in other were larger than 
available resources in this, else false.
+ */
+public boolean remove(NormalizedResourcesWithMemory other) {
--- End diff --

Maybe change `public boolean remove(NormalizedResourcesWithMemory other, 
ResourceMetrics resourceMetrics)` and then just call `remove(other, null)`


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228298658
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -119,15 +120,81 @@
 // tests by subclassing.
 private static Utils _instance = new Utils();
 private static String memoizedLocalHostnameString = null;
-public static final Pattern TOPOLOGY_KEY_PATTERN = 
Pattern.compile("^[\\w \\t\\._-]+$", Pattern.UNICODE_CHARACTER_CLASS);
+public static final Pattern TOPOLOGY_KEY_PATTERN =
+Pattern.compile("^[\\w \\t\\._-]+$", 
Pattern.UNICODE_CHARACTER_CLASS);
+
+public static final String NUMA_MEMORY_IN_MB = "memory.mb";
+public static final String NUMA_CORES = "cores";
+public static final String NUMAS_PORTS = "ports";
+public static final String NUMA_ID = "node_id";
+public static final String NUMAS_BASE = "numas";
 
 static {
 localConf = readStormConfig();
 serializationDelegate = getSerializationDelegate(localConf);
 }
 
 /**
- * Provide an instance of this class for delegates to use.  To mock 
out delegated methods, provide an instance of a subclass that
+ * Validate supervisor numa configuration.
+ * @param stormConf stormConf
+ * @return getValidatedNumaMap
+ * @throws KeyNotFoundException
+ */
+public static Map getValidatedNumaMap(Map stormConf) throws KeyNotFoundException {
+Map validatedNumaMap = new HashMap();
+Map supervisorNumaMap =
+(Map) 
stormConf.get(Config.SUPERVISOR_NUMA_META);
+if (supervisorNumaMap == null) {
+return validatedNumaMap;
+}
+if (!supervisorNumaMap.containsKey(NUMAS_BASE)) {
+throw new KeyNotFoundException(
+"The numa configurations [" + NUMAS_BASE + "] is 
missing!"
+);
+}
+List numaEntries = (List) 
supervisorNumaMap.get(NUMAS_BASE);
+if (numaEntries == null) return validatedNumaMap;
+for (Map numa : numaEntries) {
+for (String key : new String[]{NUMA_ID, NUMA_CORES, 
NUMA_MEMORY_IN_MB, NUMAS_PORTS}) {
+if (!numa.containsKey(key)) {
+throw new KeyNotFoundException(
--- End diff --

Same here. I think `IllegalArgumentException` is better


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228301282
  
--- Diff: storm-client/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -327,10 +396,12 @@ public static boolean isSystemId(String id) {
 }
 
 /**
- * Creates a thread that calls the given code repeatedly, sleeping for 
an interval of seconds equal to the return value of the previous
+ * Creates a thread that calls the given code repeatedly, sleeping for 
an interval of seconds
+ * equal to the return value of the previous
  * call.
--- End diff --

nit: put "call" in the same line with other words?


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228321712
  
--- Diff: 
storm-server/src/test/java/org/apache/storm/daemon/supervisor/BasicContainerTest.java
 ---
@@ -390,6 +398,116 @@ public void testLaunch() throws Exception {
"storm.log.dir", stormLogDir);
 }
 
+@Test
+public void testNumaPinnedLaunch() throws Exception {
+final String topoId = "test_topology_current";
--- End diff --

it doesn't matter but it seems to better have `topoId = 
"test_topology_numa_pinned";`


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228287958
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
 ---
@@ -607,6 +608,27 @@ protected String javaCmd(String cmd) {
 return ret;
 }
 
+/**
+ * Extracting out to mock it for tests.
+ * @return true if on Linux.
+ */
+protected boolean isOnLinux() {
+return SystemUtils.IS_OS_LINUX;
+}
+
+private void addNumaPinningIfApplicable(String numaId, List 
commandList) {
+if (numaId != null) {
+if (isOnLinux()) {
+commandList.add("numactl");
+commandList.add("--i=" + numaId);
+return;
+} else {
+// TODO : Add support for pinning on Windows host
--- End diff --

throw an exception?


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228287568
  
--- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
@@ -622,22 +624,26 @@ public boolean areAllConnectionsReady() {
 return this.autoCredentials;
 }
 
-private List> readWorkerExecutors(IStormClusterState 
stormClusterState, String topologyId, String assignmentId,
+private List> readWorkerExecutors(IStormClusterState 
stormClusterState,
+ String topologyId, String 
assignmentId,
  int port) {
 LOG.info("Reading assignments");
 List> executorsAssignedToThisWorker = new ArrayList<>();
 executorsAssignedToThisWorker.add(Constants.SYSTEM_EXECUTOR_ID);
-Map, NodeInfo> executorToNodePort = 
getLocalAssignment(conf, stormClusterState, 
topologyId).get_executor_node_port();
+Map, NodeInfo> executorToNodePort =
+getLocalAssignment(conf, stormClusterState, 
topologyId).get_executor_node_port();
 for (Map.Entry, NodeInfo> entry : 
executorToNodePort.entrySet()) {
 NodeInfo nodeInfo = entry.getValue();
-if (nodeInfo.get_node().equals(assignmentId) && 
nodeInfo.get_port().iterator().next() == port) {
+if (nodeInfo.get_node().startsWith(assignmentId) && 
nodeInfo.get_port().iterator().next() == port) {
--- End diff --

Started a new / non-resolved conversation.

Can we have assignment id = supervisor1 and another assignment id = 
supervisor11 that would cause aliasing?

Instead of startsWith, can we use some known delimiter to grab up to and 
make sure that matches exactly?


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228266730
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SynchronizeAssignments.java
 ---
@@ -12,16 +12,24 @@
 
 package org.apache.storm.daemon.supervisor.timer;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+
--- End diff --

```suggestion

```


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228264081
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java ---
@@ -77,6 +91,15 @@ public static BlobKeySequenceInfo 
normalizeNimbusHostPortSequenceNumberInfo(Stri
 }
 
 // Check for latest sequence number of a key inside zookeeper and 
return nimbodes containing the latest sequence number
--- End diff --

```suggestion

```


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228266466
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
 ---
@@ -17,14 +17,23 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
--- End diff --

```suggestion

```


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228227560
  
--- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
@@ -1041,6 +1041,19 @@
 @isPositiveNumber
 @NotNull
 public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = 
"supervisor.worker.timeout.secs";
+
+/**
+ * A map with blobstore keys mapped to each NUMA Node on the 
supervisor that will be used
--- End diff --

What's `blobstore keys` here


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228265639
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
 ---
@@ -607,6 +608,27 @@ protected String javaCmd(String cmd) {
 return ret;
 }
 
+/**
+ * Extracting out to mock it for tests.
+ * @return true if on Linux.
+ */
+protected boolean isOnLinux() {
+return SystemUtils.IS_OS_LINUX;
+}
+
+private void addNumaPinningIfApplicable(String numaId, List 
commandList) {
--- End diff --

It can be confusing since we are actually adding `numactl` as a prefix. 
This function looks like we are adding `numactl` to the end of the 
`commandList`. Although when we use it, `commandList` is empty.


---


[GitHub] storm pull request #2881: STORM-3259: NUMA Support for Storm

2018-10-25 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2881#discussion_r228264226
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/blobstore/BlobStoreUtils.java ---
@@ -101,7 +124,11 @@ public static BlobKeySequenceInfo 
normalizeNimbusHostPortSequenceNumberInfo(Stri
 return nimbusInfoSet;
 }
 
-// Get sequence number details from latest sequence number of the blob
+/**
+ * Get sequence number details from latest sequence number of the blob
--- End diff --

```suggestion
 * Get sequence number details from latest sequence number of the blob.
```


---


[GitHub] storm pull request #2878: [STORM-3257] 'storm kill' command line should be a...

2018-10-25 Thread jacobtolar
Github user jacobtolar commented on a diff in the pull request:

https://github.com/apache/storm/pull/2878#discussion_r228237268
  
--- Diff: storm-core/src/jvm/org/apache/storm/command/KillTopology.java ---
@@ -25,21 +25,49 @@
 
 public static void main(String[] args) throws Exception {
 Map cl = CLI.opt("w", "wait", null, CLI.AS_INT)
+.boolOpt("i", "ignore-errors")
--- End diff --

I'm always a little wary of changing the default behavior of something. But 
I'd be happy to remove this option and just make this the default if that's 
acceptable. 


---


[GitHub] storm pull request #2878: [STORM-3257] 'storm kill' command line should be a...

2018-10-25 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2878#discussion_r228222037
  
--- Diff: storm-core/src/jvm/org/apache/storm/command/KillTopology.java ---
@@ -25,21 +25,49 @@
 
 public static void main(String[] args) throws Exception {
 Map cl = CLI.opt("w", "wait", null, CLI.AS_INT)
+.boolOpt("i", "ignore-errors")
--- End diff --

do we even want/need this option?  Any reason not to have this be the 
default behavior?


---