Update NonProvisioningController

- Bring the code more inline with how AbstractController is written.
- Tested it with the downstream project:
  https://github.com/cloudsoft/brooklyn-aws-elb/


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/7e695d19
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/7e695d19
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/7e695d19

Branch: refs/heads/master
Commit: 7e695d19ef599d2d67ab81822106781db6bbfe33
Parents: 3bdd17e
Author: Aled Sage <aled.s...@gmail.com>
Authored: Fri Feb 19 19:37:06 2016 +0000
Committer: Aled Sage <aled.s...@gmail.com>
Committed: Fri Mar 11 22:28:25 2016 +0000

----------------------------------------------------------------------
 .../entity/proxy/AbstractControllerImpl.java    |   2 +-
 .../proxy/AbstractNonProvisionedController.java |   4 +
 .../AbstractNonProvisionedControllerImpl.java   | 321 ++++++++++++-------
 3 files changed, 219 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/7e695d19/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractControllerImpl.java
----------------------------------------------------------------------
diff --git 
a/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractControllerImpl.java
 
b/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractControllerImpl.java
index 47ef7a9..257882d 100644
--- 
a/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractControllerImpl.java
+++ 
b/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractControllerImpl.java
@@ -499,7 +499,7 @@ public abstract class AbstractControllerImpl extends 
SoftwareProcessImpl impleme
     }
 
     // Utilities for modifying an AttributeSensor of type map
-    private static class MapAttribute {
+    static class MapAttribute {
         public static <K, V> V put(Entity entity, AttributeSensor<Map<K,V>> 
attribute, K key, V value) {
             Map<K, V> oldMap = entity.getAttribute(attribute);
             Map<K, V> newMap = MutableMap.copyOf(oldMap);

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/7e695d19/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedController.java
----------------------------------------------------------------------
diff --git 
a/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedController.java
 
b/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedController.java
index 70df171..cbcfe65 100644
--- 
a/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedController.java
+++ 
b/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedController.java
@@ -18,6 +18,8 @@
  */
 package org.apache.brooklyn.entity.proxy;
 
+import java.util.Set;
+
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.ImplementedBy;
 
@@ -25,4 +27,6 @@ import org.apache.brooklyn.api.entity.ImplementedBy;
 public interface AbstractNonProvisionedController extends LoadBalancer, Entity 
{
 
     public boolean isActive();
+    
+    Set<String> getServerPoolAddresses();
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/7e695d19/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedControllerImpl.java
----------------------------------------------------------------------
diff --git 
a/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedControllerImpl.java
 
b/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedControllerImpl.java
index 137be36..c26235a 100644
--- 
a/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedControllerImpl.java
+++ 
b/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedControllerImpl.java
@@ -18,9 +18,9 @@
  */
 package org.apache.brooklyn.entity.proxy;
 
-import static com.google.common.base.Preconditions.checkState;
 import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
 
+import java.net.URI;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
@@ -28,20 +28,28 @@ import java.util.Set;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.Group;
 import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.policy.Policy;
 import org.apache.brooklyn.api.policy.PolicySpec;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.core.entity.AbstractEntity;
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
 import org.apache.brooklyn.core.entity.trait.Startable;
 import org.apache.brooklyn.core.feed.ConfigToAttributes;
 import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy;
-import org.apache.brooklyn.entity.group.Cluster;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.entity.proxy.AbstractControllerImpl.MapAttribute;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 public abstract class AbstractNonProvisionedControllerImpl extends 
AbstractEntity implements AbstractNonProvisionedController {
     
@@ -51,25 +59,9 @@ public abstract class AbstractNonProvisionedControllerImpl 
extends AbstractEntit
     protected volatile boolean updateNeeded = true;
     
     protected AbstractMembershipTrackingPolicy serverPoolMemberTrackerPolicy;
-    protected Set<String> serverPoolAddresses = Sets.newLinkedHashSet();
-    protected Map<Entity,String> serverPoolTargets = Maps.newLinkedHashMap();
+    protected final Object mutex = new Object();
     
     public AbstractNonProvisionedControllerImpl() {
-        this(MutableMap.of(), null, null);
-    }
-    public AbstractNonProvisionedControllerImpl(Map<?,?> properties) {
-        this(properties, null, null);
-    }
-    public AbstractNonProvisionedControllerImpl(Entity parent) {
-        this(MutableMap.of(), parent, null);
-    }
-    public AbstractNonProvisionedControllerImpl(Map<?,?> properties, Entity 
parent) {
-        this(properties, parent, null);
-    }
-    public AbstractNonProvisionedControllerImpl(Entity parent, Cluster 
cluster) {
-        this(MutableMap.of(), parent, cluster);
-    }
-    public AbstractNonProvisionedControllerImpl(Map<?,?> properties, Entity 
parent, Cluster cluster) {
     }
 
     public static class MemberTrackingPolicy extends 
AbstractMembershipTrackingPolicy {
@@ -78,6 +70,27 @@ public abstract class AbstractNonProvisionedControllerImpl 
extends AbstractEntit
         }
     }
 
+    @Override
+    public void init() {
+        super.init();
+        sensors().set(SERVER_POOL_TARGETS, ImmutableMap.<Entity, String>of());
+    }
+
+    @Override
+    public void rebind() {
+        super.rebind();
+        
+        // For backwards compatibility, in case anything persisted before this 
was added
+        if (sensors().get(SERVER_POOL_TARGETS) == null) {
+            sensors().set(SERVER_POOL_TARGETS, ImmutableMap.<Entity, 
String>of());
+        }
+    }
+
+    @Override
+    public Set<String> getServerPoolAddresses() {
+        return 
ImmutableSet.copyOf(Iterables.filter(getAttribute(SERVER_POOL_TARGETS).values(),
 Predicates.notNull()));
+    }
+
     /**
      * Opportunity to do late-binding of the cluster that is being controlled. 
Must be called before start().
      * Can pass in the 'serverPool'.
@@ -96,96 +109,179 @@ public abstract class 
AbstractNonProvisionedControllerImpl extends AbstractEntit
     
     @Override
     public void start(Collection<? extends Location> locations) {
+        // TODO execute as tasks?
         preStart();
+        doStart(locations);
+        postStart();
     }
 
     @Override
     public void stop() {
+        // TODO execute as tasks?
         preStop();
+        doStop();
+        postStop();
     }
     
     protected void preStart() {
-        AttributeSensor<?> hostAndPortSensor = getConfig(HOST_AND_PORT_SENSOR);
-        Maybe<Object> hostnameSensor = config().getRaw(HOSTNAME_SENSOR);
-        Maybe<Object> portSensor = config().getRaw(PORT_NUMBER_SENSOR);
-        if (hostAndPortSensor != null) {
-            checkState(!hostnameSensor.isPresent() && !portSensor.isPresent(), 
-                    "Must not set %s and either of %s or %s", 
HOST_AND_PORT_SENSOR, HOSTNAME_SENSOR, PORT_NUMBER_SENSOR);
-        }
-        
         ConfigToAttributes.apply(this);
-        addServerPoolMemberTrackerPolicy();
+    }
+    
+    protected void doStart(Collection<? extends Location> locations) {
+    }
+
+    protected void postStart() {
+        sensors().set(PROTOCOL, inferProtocol());
+        sensors().set(MAIN_URI, URI.create(inferUrl()));
+        sensors().set(ROOT_URL, inferUrl());
+        addServerPoolMemberTrackingPolicy();
     }
     
     protected void preStop() {
-        removeServerPoolMemberTrackerPolicy();
+        removeServerPoolMemberTrackingPolicy();
     }
-        
-    protected void addServerPoolMemberTrackerPolicy() {
+
+    protected void doStop() {
+    }
+    
+    protected void postStop() {
+    }
+
+    protected abstract String inferProtocol();
+    
+    protected abstract String inferUrl();
+
+    protected void addServerPoolMemberTrackingPolicy() {
         Group serverPool = getServerPool();
-        if (serverPool != null) {
-            serverPoolMemberTrackerPolicy = 
policies().add(PolicySpec.create(MemberTrackingPolicy.class)
-                    .displayName("Controller targets tracker")
-                    .configure("group", serverPool));
-            
-            LOG.info("Added policy {} to {}, during start", 
serverPoolMemberTrackerPolicy, this);
-            
-            serverPoolAddresses.clear();
-            serverPoolTargets.clear();
-                
-            // Initialize ourselves immediately with the latest set of 
members; don't wait for
-            // listener notifications because then will be out-of-date for 
short period (causing 
-            // problems for rebind)
-            for (Entity member : getServerPool().getMembers()) {
-                if (belongsInServerPool(member)) {
-                    if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity 
{}", this, member);
-                    String address = getAddressOfEntity(member);
-                    serverPoolTargets.put(member, address);
-                    if (address != null) {
-                        serverPoolAddresses.add(address);
-                    }
-                }
+        if (serverPool == null) {
+            return; // no-op
+        }
+        if (serverPoolMemberTrackerPolicy != null) {
+            LOG.debug("Call to addServerPoolMemberTrackingPolicy when 
serverPoolMemberTrackingPolicy already exists, removing and re-adding, in {}", 
this);
+            removeServerPoolMemberTrackingPolicy();
+        }
+        for (Policy p: policies()) {
+            if (p instanceof ServerPoolMemberTrackerPolicy) {
+                // TODO want a more elegant idiom for this!
+                LOG.info(this+" picking up "+p+" as the tracker (already set, 
often due to rebind)");
+                serverPoolMemberTrackerPolicy = 
(ServerPoolMemberTrackerPolicy) p;
+                return;
             }
-            
-            LOG.info("Resetting {}, members {} with addresses {}", new 
Object[] {this, serverPoolTargets, serverPoolAddresses});
-            sensors().set(SERVER_POOL_TARGETS, serverPoolTargets);
         }
+        
+        serverPoolMemberTrackerPolicy = 
policies().add(PolicySpec.create(MemberTrackingPolicy.class)
+                .displayName("Controller targets tracker")
+                .configure("group", serverPool));
+        
+        AttributeSensor<?> hostAndPortSensor = getConfig(HOST_AND_PORT_SENSOR);
+        AttributeSensor<?> hostnameSensor = getConfig(HOSTNAME_SENSOR);
+        AttributeSensor<?> portSensor = getConfig(PORT_NUMBER_SENSOR);
+        Set<AttributeSensor<?>> sensorsToTrack;
+        if (hostAndPortSensor != null) {
+            sensorsToTrack = 
ImmutableSet.<AttributeSensor<?>>of(hostAndPortSensor);
+        } else {
+            sensorsToTrack = 
ImmutableSet.<AttributeSensor<?>>of(hostnameSensor, portSensor);
+        }
+        
+        serverPoolMemberTrackerPolicy = 
policies().add(PolicySpec.create(ServerPoolMemberTrackerPolicy.class)
+                .displayName("Controller targets tracker")
+                .configure("group", serverPool)
+                .configure("sensorsToTrack", sensorsToTrack));
+
+        LOG.info("Added policy {} to {}", serverPoolMemberTrackerPolicy, this);
+        
+        
+        // Initialize ourselves immediately with the latest set of members; 
don't wait for
+        // listener notifications because then will be out-of-date for short 
period (causing 
+        // problems for rebind)
+        Map<Entity,String> serverPoolTargets = Maps.newLinkedHashMap();
+        for (Entity member : serverPool.getMembers()) {
+            if (belongsInServerPool(member)) {
+                if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", 
this, member);
+                String address = getAddressOfEntity(member);
+                serverPoolTargets.put(member, address);
+            }
+        }
+
+        LOG.info("Resetting {}, server pool targets {}", new Object[] {this, 
serverPoolTargets});
+        sensors().set(SERVER_POOL_TARGETS, serverPoolTargets);
     }
     
-    protected void removeServerPoolMemberTrackerPolicy() {
+    protected void removeServerPoolMemberTrackingPolicy() {
         if (serverPoolMemberTrackerPolicy != null) {
             policies().remove(serverPoolMemberTrackerPolicy);
         }
     }
     
+    public static class ServerPoolMemberTrackerPolicy extends 
AbstractMembershipTrackingPolicy {
+        @Override
+        protected void onEntityEvent(EventType type, Entity entity) {
+            // relies on policy-rebind injecting the implementation rather 
than the dynamic-proxy
+            
((AbstractNonProvisionedControllerImpl)super.entity).onServerPoolMemberChanged(entity);
+        }
+    }
+
     /** 
      * Implementations should update the configuration so that 
'serverPoolAddresses' are targeted.
      * The caller will subsequently call reload to apply the new configuration.
      */
     protected abstract void reconfigureService();
     
+    public void updateNeeded() {
+        synchronized (mutex) {
+            if (updateNeeded) return;
+            updateNeeded = true;
+            LOG.debug("queueing an update-needed task for "+this+"; update 
will occur shortly");
+            Entities.submit(this, 
Tasks.builder().displayName("update-needed").body(new Runnable() {
+                @Override
+                public void run() {
+                    if (updateNeeded)
+                        AbstractNonProvisionedControllerImpl.this.update();
+                } 
+            }).build());
+        }
+    }
+    
     @Override
-    public synchronized void update() {
-        if (!isActive()) updateNeeded = true;
-        else {
-            updateNeeded = false;
-            LOG.debug("Updating {} in response to changes", this);
-            reconfigureService();
-            LOG.debug("Reloading {} in response to changes", this);
-            invoke(RELOAD);
+    public void update() {
+        try {
+            Task<?> task = updateAsync();
+            if (task != null) task.getUnchecked();
+            
ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(this, "update");
+        } catch (Exception e) {
+            
ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(this, "update", 
"update failed with: "+Exceptions.collapseText(e));
+            throw Exceptions.propagate(e);
         }
-        sensors().set(SERVER_POOL_TARGETS, serverPoolTargets);
     }
     
-    protected synchronized void onServerPoolMemberChanged(Entity member) {
-        if (LOG.isTraceEnabled()) LOG.trace("For {}, considering membership of 
{} which is in locations {}", 
-                new Object[] {this, member, member.getLocations()});
-        if (belongsInServerPool(member)) {
-            addServerPoolMember(member);
-        } else {
-            removeServerPoolMember(member);
+    public Task<?> updateAsync() {
+        synchronized (mutex) {
+            Task<?> result = null;
+            if (!isActive()) updateNeeded = true;
+            else {
+                updateNeeded = false;
+                LOG.debug("Updating {} in response to changes", this);
+                LOG.info("Updating {}, server pool targets {}", new Object[] 
{this, getAttribute(SERVER_POOL_TARGETS)});
+                reconfigureService();
+                LOG.debug("Reloading {} in response to changes", this);
+                invoke(RELOAD);
+            }
+            return result;
+        }
+    }
+
+    
+    protected void onServerPoolMemberChanged(Entity member) {
+        synchronized (mutex) {
+            if (LOG.isTraceEnabled()) LOG.trace("For {}, considering 
membership of {} which is in locations {}", 
+                    new Object[] {this, member, member.getLocations()});
+            if (belongsInServerPool(member)) {
+                addServerPoolMember(member);
+            } else {
+                removeServerPoolMember(member);
+            }
+            if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", 
this, member);
         }
-        if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", this, 
member);
     }
     
     protected boolean belongsInServerPool(Entity member) {
@@ -217,40 +313,51 @@ public abstract class 
AbstractNonProvisionedControllerImpl extends AbstractEntit
         return getAttribute(HOST_AND_PORT_SENSOR);
     }
 
-    protected synchronized void addServerPoolMember(Entity member) {
-        if (serverPoolTargets.containsKey(member)) {
-            if (LOG.isTraceEnabled()) LOG.trace("For {}, not adding as already 
have member {}", new Object[] {this, member});
-            return;
-        }
-        
-        String address = getAddressOfEntity(member);
-        if (address != null) {
-            serverPoolAddresses.add(address);
-        }
+    protected void addServerPoolMember(Entity member) {
+        synchronized (mutex) {
+            String oldAddress = getAttribute(SERVER_POOL_TARGETS).get(member);
+            String newAddress = getAddressOfEntity(member);
+            if (Objects.equal(newAddress, oldAddress)) {
+                if (LOG.isTraceEnabled())
+                    if (LOG.isTraceEnabled()) LOG.trace("Ignoring unchanged 
address {}", oldAddress);
+                return;
+            } else if (newAddress == null) {
+                LOG.info("Removing from {}, member {} with old address {}, 
because inferred address is now null", new Object[] {this, member, oldAddress});
+            } else {
+                if (oldAddress != null) {
+                    LOG.info("Replacing in {}, member {} with old address {}, 
new address {}", new Object[] {this, member, oldAddress, newAddress});
+                } else {
+                    LOG.info("Adding to {}, new member {} with address {}", 
new Object[] {this, member, newAddress});
+                }
+            }
 
-        LOG.info("Adding to {}, new member {} with address {}", new Object[] 
{this, member, address});
-        
-        update();
-        serverPoolTargets.put(member, address);
-    }
-    
-    protected synchronized void removeServerPoolMember(Entity member) {
-        if (!serverPoolTargets.containsKey(member)) {
-            if (LOG.isTraceEnabled()) LOG.trace("For {}, not removing as don't 
have member {}", new Object[] {this, member});
-            return;
+            if (Objects.equal(oldAddress, newAddress)) {
+                if (LOG.isTraceEnabled()) LOG.trace("For {}, ignoring change 
in member {} because address still {}", new Object[] {this, member, 
newAddress});
+                return;
+            }
+
+            // TODO this does it synchronously; an async method leaning on 
`updateNeeded` and `update` might
+            // be more appropriate, especially when this is used in a listener
+            MapAttribute.put(this, SERVER_POOL_TARGETS, member, newAddress);
+            updateAsync();
         }
-        
-        String address = serverPoolTargets.get(member);
-        if (address != null) {
-            serverPoolAddresses.remove(address);
+    }
+
+    protected void removeServerPoolMember(Entity member) {
+        synchronized (mutex) {
+            if (!getAttribute(SERVER_POOL_TARGETS).containsKey(member)) {
+                if (LOG.isTraceEnabled()) LOG.trace("For {}, not removing as 
don't have member {}", new Object[] {this, member});
+                return;
+            }
+
+            String address = MapAttribute.remove(this, SERVER_POOL_TARGETS, 
member);
+
+            LOG.info("Removing from {}, member {} with address {}", new 
Object[] {this, member, address});
+
+            updateAsync();
         }
-        
-        LOG.info("Removing from {}, member {} with address {}", new Object[] 
{this, member, address});
-        
-        update();
-        serverPoolTargets.remove(member);
     }
-    
+
     protected String getAddressOfEntity(Entity member) {
         AttributeSensor<String> hostAndPortSensor = getHostAndPortSensor();
         if (hostAndPortSensor != null) {

Reply via email to