[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...

2018-12-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2918


---


[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...

2018-12-06 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2918#discussion_r239522894
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
 ---
@@ -79,25 +81,46 @@
 
 if (shortage.areAnyOverZero() || shortageSlots > 0) {
 LOG.info("Need {} and {} slots more. Releasing some 
blacklisted nodes to cover it.", shortage, shortageSlots);
-//release earliest blacklist
-for (String supervisor : blacklistedNodeIds) {
-SupervisorDetails sd = 
availableSupervisors.get(supervisor);
-if (sd != null) {
-NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
-int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
-readyToRemove.add(supervisor);
-shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
-shortageSlots -= sdAvailableSlots;
-LOG.debug("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisor,
-sdAvailable, sdAvailableSlots, shortage, 
shortageSlots);
-if (!shortage.areAnyOverZero() && shortageSlots <= 
0) {
-// we have enough resources now...
-break;
+
+//release earliest blacklist - but release all supervisors 
on a given blacklisted host.
+Map> hostToSupervisorIds = 
createHostToSupervisorMap(blacklistedNodeIds, cluster);
+for (Set supervisorIds : 
hostToSupervisorIds.values()) {
+for (String supervisorId : supervisorIds) {
+SupervisorDetails sd = 
availableSupervisors.get(supervisorId);
+if (sd != null) {
+NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
+int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
+readyToRemove.add(supervisorId);
+shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
+shortageSlots -= sdAvailableSlots;
+LOG.info("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisorId,
+sdAvailable, sdAvailableSlots, 
shortage, shortageSlots);
 }
 }
+// make sure we've handled all supervisors on the host 
before we break
+if (!shortage.areAnyOverZero() && shortageSlots <= 0) {
+// we have enough resources now...
+break;
--- End diff --

I am doing what you are indicating  I create a list of supervisors on 
the host and release all the supervisors before breaking.


---


[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...

2018-12-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2918#discussion_r239517824
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
 ---
@@ -79,25 +81,46 @@
 
 if (shortage.areAnyOverZero() || shortageSlots > 0) {
 LOG.info("Need {} and {} slots more. Releasing some 
blacklisted nodes to cover it.", shortage, shortageSlots);
-//release earliest blacklist
-for (String supervisor : blacklistedNodeIds) {
-SupervisorDetails sd = 
availableSupervisors.get(supervisor);
-if (sd != null) {
-NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
-int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
-readyToRemove.add(supervisor);
-shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
-shortageSlots -= sdAvailableSlots;
-LOG.debug("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisor,
-sdAvailable, sdAvailableSlots, shortage, 
shortageSlots);
-if (!shortage.areAnyOverZero() && shortageSlots <= 
0) {
-// we have enough resources now...
-break;
+
+//release earliest blacklist - but release all supervisors 
on a given blacklisted host.
+Map> hostToSupervisorIds = 
createHostToSupervisorMap(blacklistedNodeIds, cluster);
+for (Set supervisorIds : 
hostToSupervisorIds.values()) {
+for (String supervisorId : supervisorIds) {
+SupervisorDetails sd = 
availableSupervisors.get(supervisorId);
+if (sd != null) {
+NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
+int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
+readyToRemove.add(supervisorId);
+shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
+shortageSlots -= sdAvailableSlots;
+LOG.info("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisorId,
+sdAvailable, sdAvailableSlots, 
shortage, shortageSlots);
--- End diff --

@danny0405 This code only happens when the cluster is full.  Meaning a 
topology is not scheduled and there are not enough free resources on the 
cluster to run that topology. The idea is that if a cluster is full and there 
are blacklisted supervisors it is better to try and run things on possibly bad 
hosts instead of not running anything.  The issue here is that the existing 
code is broken if there are multiple supervisors on a single node, and this is 
fixing the existing code to operate properly in that case.

If you have problems with releasing blacklisted supervisors/nodes at all 
then that is a separate JIRA.


---


[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...

2018-12-06 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2918#discussion_r239519351
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
 ---
@@ -79,25 +81,46 @@
 
 if (shortage.areAnyOverZero() || shortageSlots > 0) {
 LOG.info("Need {} and {} slots more. Releasing some 
blacklisted nodes to cover it.", shortage, shortageSlots);
-//release earliest blacklist
-for (String supervisor : blacklistedNodeIds) {
-SupervisorDetails sd = 
availableSupervisors.get(supervisor);
-if (sd != null) {
-NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
-int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
-readyToRemove.add(supervisor);
-shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
-shortageSlots -= sdAvailableSlots;
-LOG.debug("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisor,
-sdAvailable, sdAvailableSlots, shortage, 
shortageSlots);
-if (!shortage.areAnyOverZero() && shortageSlots <= 
0) {
-// we have enough resources now...
-break;
+
+//release earliest blacklist - but release all supervisors 
on a given blacklisted host.
+Map> hostToSupervisorIds = 
createHostToSupervisorMap(blacklistedNodeIds, cluster);
+for (Set supervisorIds : 
hostToSupervisorIds.values()) {
+for (String supervisorId : supervisorIds) {
+SupervisorDetails sd = 
availableSupervisors.get(supervisorId);
+if (sd != null) {
+NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
+int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
+readyToRemove.add(supervisorId);
+shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
+shortageSlots -= sdAvailableSlots;
+LOG.info("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisorId,
+sdAvailable, sdAvailableSlots, 
shortage, shortageSlots);
 }
 }
+// make sure we've handled all supervisors on the host 
before we break
+if (!shortage.areAnyOverZero() && shortageSlots <= 0) {
+// we have enough resources now...
+break;
--- End diff --

I think we need to break after we have released all of the supervisors on 
the node, otherwise we can have released a single supervisor on a node and got 
enough resources, but but because there is still at least one supervisor on 
that node still blacklisted the whole node is still blacklisted.


---


[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...

2018-12-06 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2918#discussion_r239461731
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
 ---
@@ -79,25 +81,46 @@
 
 if (shortage.areAnyOverZero() || shortageSlots > 0) {
 LOG.info("Need {} and {} slots more. Releasing some 
blacklisted nodes to cover it.", shortage, shortageSlots);
-//release earliest blacklist
-for (String supervisor : blacklistedNodeIds) {
-SupervisorDetails sd = 
availableSupervisors.get(supervisor);
-if (sd != null) {
-NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
-int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
-readyToRemove.add(supervisor);
-shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
-shortageSlots -= sdAvailableSlots;
-LOG.debug("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisor,
-sdAvailable, sdAvailableSlots, shortage, 
shortageSlots);
-if (!shortage.areAnyOverZero() && shortageSlots <= 
0) {
-// we have enough resources now...
-break;
+
+//release earliest blacklist - but release all supervisors 
on a given blacklisted host.
+Map> hostToSupervisorIds = 
createHostToSupervisorMap(blacklistedNodeIds, cluster);
+for (Set supervisorIds : 
hostToSupervisorIds.values()) {
+for (String supervisorId : supervisorIds) {
+SupervisorDetails sd = 
availableSupervisors.get(supervisorId);
+if (sd != null) {
+NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
+int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
+readyToRemove.add(supervisorId);
+shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
+shortageSlots -= sdAvailableSlots;
+LOG.info("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisorId,
+sdAvailable, sdAvailableSlots, 
shortage, shortageSlots);
--- End diff --

It sounds like you don't agree even with the existing code that releases 
blacklisted supervisors when needed for scheduling? 

Should I be looking at converting all the existing code to blacklist 
supervisors instead of hosts to get things functional when there are more than 
one supervisor on a host?

Thanks.



---


[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...

2018-12-05 Thread danny0405
Github user danny0405 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2918#discussion_r239306840
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
 ---
@@ -79,25 +81,46 @@
 
 if (shortage.areAnyOverZero() || shortageSlots > 0) {
 LOG.info("Need {} and {} slots more. Releasing some 
blacklisted nodes to cover it.", shortage, shortageSlots);
-//release earliest blacklist
-for (String supervisor : blacklistedNodeIds) {
-SupervisorDetails sd = 
availableSupervisors.get(supervisor);
-if (sd != null) {
-NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
-int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
-readyToRemove.add(supervisor);
-shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
-shortageSlots -= sdAvailableSlots;
-LOG.debug("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisor,
-sdAvailable, sdAvailableSlots, shortage, 
shortageSlots);
-if (!shortage.areAnyOverZero() && shortageSlots <= 
0) {
-// we have enough resources now...
-break;
+
+//release earliest blacklist - but release all supervisors 
on a given blacklisted host.
+Map> hostToSupervisorIds = 
createHostToSupervisorMap(blacklistedNodeIds, cluster);
+for (Set supervisorIds : 
hostToSupervisorIds.values()) {
+for (String supervisorId : supervisorIds) {
+SupervisorDetails sd = 
availableSupervisors.get(supervisorId);
+if (sd != null) {
+NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
+int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
+readyToRemove.add(supervisorId);
+shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
+shortageSlots -= sdAvailableSlots;
+LOG.info("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisorId,
+sdAvailable, sdAvailableSlots, 
shortage, shortageSlots);
--- End diff --

@agresch Got your idea.
In my personal opinion, for production, if a node has a blacklist 
supervisor(which means the supervisor does not send any HBs for some time), 
most of the cases it because the node machine itself has some problems(for now 
there are a few causes like: disk is full or network is in disconnection), so a 
safer way is we never schedule workers to the node if there are some blacklist 
supervisors on it.

If you want to make use of the healthy supervisor on the node(has some 
blacklist supervisors also), at least there is a decision to make sure the 
supervisor is healthy, we can do this through checking the heartbeats of it.


---


[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...

2018-12-05 Thread agresch
Github user agresch commented on a diff in the pull request:

https://github.com/apache/storm/pull/2918#discussion_r239066404
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
 ---
@@ -79,25 +81,46 @@
 
 if (shortage.areAnyOverZero() || shortageSlots > 0) {
 LOG.info("Need {} and {} slots more. Releasing some 
blacklisted nodes to cover it.", shortage, shortageSlots);
-//release earliest blacklist
-for (String supervisor : blacklistedNodeIds) {
-SupervisorDetails sd = 
availableSupervisors.get(supervisor);
-if (sd != null) {
-NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
-int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
-readyToRemove.add(supervisor);
-shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
-shortageSlots -= sdAvailableSlots;
-LOG.debug("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisor,
-sdAvailable, sdAvailableSlots, shortage, 
shortageSlots);
-if (!shortage.areAnyOverZero() && shortageSlots <= 
0) {
-// we have enough resources now...
-break;
+
+//release earliest blacklist - but release all supervisors 
on a given blacklisted host.
+Map> hostToSupervisorIds = 
createHostToSupervisorMap(blacklistedNodeIds, cluster);
+for (Set supervisorIds : 
hostToSupervisorIds.values()) {
+for (String supervisorId : supervisorIds) {
+SupervisorDetails sd = 
availableSupervisors.get(supervisorId);
+if (sd != null) {
+NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
+int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
+readyToRemove.add(supervisorId);
+shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
+shortageSlots -= sdAvailableSlots;
+LOG.info("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisorId,
+sdAvailable, sdAvailableSlots, 
shortage, shortageSlots);
--- End diff --

If there are two supervisors on a host that are blacklisted and we release 
one, one supervisor will remain blacklisted.  This single blacklisted 
supervisor will be returned here:


https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java#L167

Then this code will see that the same host that both supervisors are on is 
blacklisted, preventing any scheduling on that node:


https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java#L167

I think a better overall long term solution is to blacklist supervisors 
instead of hosts, but that touches a lot more code.  In the short term I think 
this is a relatively small tradeoff to allow blacklisting to work with multiple 
supervisors per host.




---


[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...

2018-12-05 Thread danny0405
Github user danny0405 commented on a diff in the pull request:

https://github.com/apache/storm/pull/2918#discussion_r239039664
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java
 ---
@@ -79,25 +81,46 @@
 
 if (shortage.areAnyOverZero() || shortageSlots > 0) {
 LOG.info("Need {} and {} slots more. Releasing some 
blacklisted nodes to cover it.", shortage, shortageSlots);
-//release earliest blacklist
-for (String supervisor : blacklistedNodeIds) {
-SupervisorDetails sd = 
availableSupervisors.get(supervisor);
-if (sd != null) {
-NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
-int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
-readyToRemove.add(supervisor);
-shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
-shortageSlots -= sdAvailableSlots;
-LOG.debug("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisor,
-sdAvailable, sdAvailableSlots, shortage, 
shortageSlots);
-if (!shortage.areAnyOverZero() && shortageSlots <= 
0) {
-// we have enough resources now...
-break;
+
+//release earliest blacklist - but release all supervisors 
on a given blacklisted host.
+Map> hostToSupervisorIds = 
createHostToSupervisorMap(blacklistedNodeIds, cluster);
+for (Set supervisorIds : 
hostToSupervisorIds.values()) {
+for (String supervisorId : supervisorIds) {
+SupervisorDetails sd = 
availableSupervisors.get(supervisorId);
+if (sd != null) {
+NormalizedResourcesWithMemory sdAvailable = 
cluster.getAvailableResources(sd);
+int sdAvailableSlots = 
cluster.getAvailablePorts(sd).size();
+readyToRemove.add(supervisorId);
+shortage.remove(sdAvailable, 
cluster.getResourceMetrics());
+shortageSlots -= sdAvailableSlots;
+LOG.info("Releasing {} with {} and {} slots 
leaving {} and {} slots to go", supervisorId,
+sdAvailable, sdAvailableSlots, 
shortage, shortageSlots);
--- End diff --

So what is the purpose we need to release supervisors blacklist grouping by 
nodes. What if the node is stuck or broken ? If we shuffle the release by 
nodes, may be the risk would be spread evenly.


---


[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...

2018-12-04 Thread agresch
GitHub user agresch opened a pull request:

https://github.com/apache/storm/pull/2918

STORM-3295 allow blacklist scheduling to function properly with multi…

…ple supervisors on a host

If any supervisor on a host remains blacklisted, 
BlacklistScheduler.getBlacklistHosts() still considers the host blacklisted.  
This change forces all supervisors on a supervisor to be released, which will 
free the host.

It may be nicer to consider blacklisting strictly based on supervisors, but 
that is open to discussion, and a much larger change than this.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/agresch/storm agresch_blacklist

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2918.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2918


commit b4d3df2167b2962bdf7039ba064ad7a646f32644
Author: Aaron Gresch 
Date:   2018-12-04T22:23:35Z

STORM-3295 allow blacklist scheduling to function properly with multiple 
supervisors on a host




---