Update NonProvisioningController - Bring the code more inline with how AbstractController is written. - Tested it with the downstream project: https://github.com/cloudsoft/brooklyn-aws-elb/
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-library/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-library/commit/7e695d19 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-library/tree/7e695d19 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-library/diff/7e695d19 Branch: refs/heads/master Commit: 7e695d19ef599d2d67ab81822106781db6bbfe33 Parents: 3bdd17e Author: Aled Sage <aled.s...@gmail.com> Authored: Fri Feb 19 19:37:06 2016 +0000 Committer: Aled Sage <aled.s...@gmail.com> Committed: Fri Mar 11 22:28:25 2016 +0000 ---------------------------------------------------------------------- .../entity/proxy/AbstractControllerImpl.java | 2 +- .../proxy/AbstractNonProvisionedController.java | 4 + .../AbstractNonProvisionedControllerImpl.java | 321 ++++++++++++------- 3 files changed, 219 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/7e695d19/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractControllerImpl.java ---------------------------------------------------------------------- diff --git a/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractControllerImpl.java b/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractControllerImpl.java index 47ef7a9..257882d 100644 --- a/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractControllerImpl.java +++ b/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractControllerImpl.java @@ -499,7 +499,7 @@ public abstract class AbstractControllerImpl extends SoftwareProcessImpl impleme } // Utilities for modifying an AttributeSensor of type map - private static class MapAttribute { + static class MapAttribute { public static <K, V> V put(Entity entity, AttributeSensor<Map<K,V>> attribute, K key, V value) { Map<K, V> oldMap = entity.getAttribute(attribute); Map<K, V> newMap = MutableMap.copyOf(oldMap); http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/7e695d19/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedController.java ---------------------------------------------------------------------- diff --git a/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedController.java b/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedController.java index 70df171..cbcfe65 100644 --- a/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedController.java +++ b/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedController.java @@ -18,6 +18,8 @@ */ package org.apache.brooklyn.entity.proxy; +import java.util.Set; + import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.ImplementedBy; @@ -25,4 +27,6 @@ import org.apache.brooklyn.api.entity.ImplementedBy; public interface AbstractNonProvisionedController extends LoadBalancer, Entity { public boolean isActive(); + + Set<String> getServerPoolAddresses(); } http://git-wip-us.apache.org/repos/asf/brooklyn-library/blob/7e695d19/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedControllerImpl.java ---------------------------------------------------------------------- diff --git a/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedControllerImpl.java b/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedControllerImpl.java index 137be36..c26235a 100644 --- a/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedControllerImpl.java +++ b/software/webapp/src/main/java/org/apache/brooklyn/entity/proxy/AbstractNonProvisionedControllerImpl.java @@ -18,9 +18,9 @@ */ package org.apache.brooklyn.entity.proxy; -import static com.google.common.base.Preconditions.checkState; import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth; +import java.net.URI; import java.util.Collection; import java.util.Map; import java.util.Set; @@ -28,20 +28,28 @@ import java.util.Set; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.Group; import org.apache.brooklyn.api.location.Location; +import org.apache.brooklyn.api.mgmt.Task; +import org.apache.brooklyn.api.policy.Policy; import org.apache.brooklyn.api.policy.PolicySpec; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.core.entity.AbstractEntity; +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic; import org.apache.brooklyn.core.entity.trait.Startable; import org.apache.brooklyn.core.feed.ConfigToAttributes; import org.apache.brooklyn.entity.group.AbstractMembershipTrackingPolicy; -import org.apache.brooklyn.entity.group.Cluster; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.guava.Maybe; +import org.apache.brooklyn.entity.proxy.AbstractControllerImpl.MapAttribute; +import org.apache.brooklyn.util.core.task.Tasks; +import org.apache.brooklyn.util.exceptions.Exceptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Objects; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; public abstract class AbstractNonProvisionedControllerImpl extends AbstractEntity implements AbstractNonProvisionedController { @@ -51,25 +59,9 @@ public abstract class AbstractNonProvisionedControllerImpl extends AbstractEntit protected volatile boolean updateNeeded = true; protected AbstractMembershipTrackingPolicy serverPoolMemberTrackerPolicy; - protected Set<String> serverPoolAddresses = Sets.newLinkedHashSet(); - protected Map<Entity,String> serverPoolTargets = Maps.newLinkedHashMap(); + protected final Object mutex = new Object(); public AbstractNonProvisionedControllerImpl() { - this(MutableMap.of(), null, null); - } - public AbstractNonProvisionedControllerImpl(Map<?,?> properties) { - this(properties, null, null); - } - public AbstractNonProvisionedControllerImpl(Entity parent) { - this(MutableMap.of(), parent, null); - } - public AbstractNonProvisionedControllerImpl(Map<?,?> properties, Entity parent) { - this(properties, parent, null); - } - public AbstractNonProvisionedControllerImpl(Entity parent, Cluster cluster) { - this(MutableMap.of(), parent, cluster); - } - public AbstractNonProvisionedControllerImpl(Map<?,?> properties, Entity parent, Cluster cluster) { } public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { @@ -78,6 +70,27 @@ public abstract class AbstractNonProvisionedControllerImpl extends AbstractEntit } } + @Override + public void init() { + super.init(); + sensors().set(SERVER_POOL_TARGETS, ImmutableMap.<Entity, String>of()); + } + + @Override + public void rebind() { + super.rebind(); + + // For backwards compatibility, in case anything persisted before this was added + if (sensors().get(SERVER_POOL_TARGETS) == null) { + sensors().set(SERVER_POOL_TARGETS, ImmutableMap.<Entity, String>of()); + } + } + + @Override + public Set<String> getServerPoolAddresses() { + return ImmutableSet.copyOf(Iterables.filter(getAttribute(SERVER_POOL_TARGETS).values(), Predicates.notNull())); + } + /** * Opportunity to do late-binding of the cluster that is being controlled. Must be called before start(). * Can pass in the 'serverPool'. @@ -96,96 +109,179 @@ public abstract class AbstractNonProvisionedControllerImpl extends AbstractEntit @Override public void start(Collection<? extends Location> locations) { + // TODO execute as tasks? preStart(); + doStart(locations); + postStart(); } @Override public void stop() { + // TODO execute as tasks? preStop(); + doStop(); + postStop(); } protected void preStart() { - AttributeSensor<?> hostAndPortSensor = getConfig(HOST_AND_PORT_SENSOR); - Maybe<Object> hostnameSensor = config().getRaw(HOSTNAME_SENSOR); - Maybe<Object> portSensor = config().getRaw(PORT_NUMBER_SENSOR); - if (hostAndPortSensor != null) { - checkState(!hostnameSensor.isPresent() && !portSensor.isPresent(), - "Must not set %s and either of %s or %s", HOST_AND_PORT_SENSOR, HOSTNAME_SENSOR, PORT_NUMBER_SENSOR); - } - ConfigToAttributes.apply(this); - addServerPoolMemberTrackerPolicy(); + } + + protected void doStart(Collection<? extends Location> locations) { + } + + protected void postStart() { + sensors().set(PROTOCOL, inferProtocol()); + sensors().set(MAIN_URI, URI.create(inferUrl())); + sensors().set(ROOT_URL, inferUrl()); + addServerPoolMemberTrackingPolicy(); } protected void preStop() { - removeServerPoolMemberTrackerPolicy(); + removeServerPoolMemberTrackingPolicy(); } - - protected void addServerPoolMemberTrackerPolicy() { + + protected void doStop() { + } + + protected void postStop() { + } + + protected abstract String inferProtocol(); + + protected abstract String inferUrl(); + + protected void addServerPoolMemberTrackingPolicy() { Group serverPool = getServerPool(); - if (serverPool != null) { - serverPoolMemberTrackerPolicy = policies().add(PolicySpec.create(MemberTrackingPolicy.class) - .displayName("Controller targets tracker") - .configure("group", serverPool)); - - LOG.info("Added policy {} to {}, during start", serverPoolMemberTrackerPolicy, this); - - serverPoolAddresses.clear(); - serverPoolTargets.clear(); - - // Initialize ourselves immediately with the latest set of members; don't wait for - // listener notifications because then will be out-of-date for short period (causing - // problems for rebind) - for (Entity member : getServerPool().getMembers()) { - if (belongsInServerPool(member)) { - if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", this, member); - String address = getAddressOfEntity(member); - serverPoolTargets.put(member, address); - if (address != null) { - serverPoolAddresses.add(address); - } - } + if (serverPool == null) { + return; // no-op + } + if (serverPoolMemberTrackerPolicy != null) { + LOG.debug("Call to addServerPoolMemberTrackingPolicy when serverPoolMemberTrackingPolicy already exists, removing and re-adding, in {}", this); + removeServerPoolMemberTrackingPolicy(); + } + for (Policy p: policies()) { + if (p instanceof ServerPoolMemberTrackerPolicy) { + // TODO want a more elegant idiom for this! + LOG.info(this+" picking up "+p+" as the tracker (already set, often due to rebind)"); + serverPoolMemberTrackerPolicy = (ServerPoolMemberTrackerPolicy) p; + return; } - - LOG.info("Resetting {}, members {} with addresses {}", new Object[] {this, serverPoolTargets, serverPoolAddresses}); - sensors().set(SERVER_POOL_TARGETS, serverPoolTargets); } + + serverPoolMemberTrackerPolicy = policies().add(PolicySpec.create(MemberTrackingPolicy.class) + .displayName("Controller targets tracker") + .configure("group", serverPool)); + + AttributeSensor<?> hostAndPortSensor = getConfig(HOST_AND_PORT_SENSOR); + AttributeSensor<?> hostnameSensor = getConfig(HOSTNAME_SENSOR); + AttributeSensor<?> portSensor = getConfig(PORT_NUMBER_SENSOR); + Set<AttributeSensor<?>> sensorsToTrack; + if (hostAndPortSensor != null) { + sensorsToTrack = ImmutableSet.<AttributeSensor<?>>of(hostAndPortSensor); + } else { + sensorsToTrack = ImmutableSet.<AttributeSensor<?>>of(hostnameSensor, portSensor); + } + + serverPoolMemberTrackerPolicy = policies().add(PolicySpec.create(ServerPoolMemberTrackerPolicy.class) + .displayName("Controller targets tracker") + .configure("group", serverPool) + .configure("sensorsToTrack", sensorsToTrack)); + + LOG.info("Added policy {} to {}", serverPoolMemberTrackerPolicy, this); + + + // Initialize ourselves immediately with the latest set of members; don't wait for + // listener notifications because then will be out-of-date for short period (causing + // problems for rebind) + Map<Entity,String> serverPoolTargets = Maps.newLinkedHashMap(); + for (Entity member : serverPool.getMembers()) { + if (belongsInServerPool(member)) { + if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", this, member); + String address = getAddressOfEntity(member); + serverPoolTargets.put(member, address); + } + } + + LOG.info("Resetting {}, server pool targets {}", new Object[] {this, serverPoolTargets}); + sensors().set(SERVER_POOL_TARGETS, serverPoolTargets); } - protected void removeServerPoolMemberTrackerPolicy() { + protected void removeServerPoolMemberTrackingPolicy() { if (serverPoolMemberTrackerPolicy != null) { policies().remove(serverPoolMemberTrackerPolicy); } } + public static class ServerPoolMemberTrackerPolicy extends AbstractMembershipTrackingPolicy { + @Override + protected void onEntityEvent(EventType type, Entity entity) { + // relies on policy-rebind injecting the implementation rather than the dynamic-proxy + ((AbstractNonProvisionedControllerImpl)super.entity).onServerPoolMemberChanged(entity); + } + } + /** * Implementations should update the configuration so that 'serverPoolAddresses' are targeted. * The caller will subsequently call reload to apply the new configuration. */ protected abstract void reconfigureService(); + public void updateNeeded() { + synchronized (mutex) { + if (updateNeeded) return; + updateNeeded = true; + LOG.debug("queueing an update-needed task for "+this+"; update will occur shortly"); + Entities.submit(this, Tasks.builder().displayName("update-needed").body(new Runnable() { + @Override + public void run() { + if (updateNeeded) + AbstractNonProvisionedControllerImpl.this.update(); + } + }).build()); + } + } + @Override - public synchronized void update() { - if (!isActive()) updateNeeded = true; - else { - updateNeeded = false; - LOG.debug("Updating {} in response to changes", this); - reconfigureService(); - LOG.debug("Reloading {} in response to changes", this); - invoke(RELOAD); + public void update() { + try { + Task<?> task = updateAsync(); + if (task != null) task.getUnchecked(); + ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(this, "update"); + } catch (Exception e) { + ServiceStateLogic.ServiceProblemsLogic.updateProblemsIndicator(this, "update", "update failed with: "+Exceptions.collapseText(e)); + throw Exceptions.propagate(e); } - sensors().set(SERVER_POOL_TARGETS, serverPoolTargets); } - protected synchronized void onServerPoolMemberChanged(Entity member) { - if (LOG.isTraceEnabled()) LOG.trace("For {}, considering membership of {} which is in locations {}", - new Object[] {this, member, member.getLocations()}); - if (belongsInServerPool(member)) { - addServerPoolMember(member); - } else { - removeServerPoolMember(member); + public Task<?> updateAsync() { + synchronized (mutex) { + Task<?> result = null; + if (!isActive()) updateNeeded = true; + else { + updateNeeded = false; + LOG.debug("Updating {} in response to changes", this); + LOG.info("Updating {}, server pool targets {}", new Object[] {this, getAttribute(SERVER_POOL_TARGETS)}); + reconfigureService(); + LOG.debug("Reloading {} in response to changes", this); + invoke(RELOAD); + } + return result; + } + } + + + protected void onServerPoolMemberChanged(Entity member) { + synchronized (mutex) { + if (LOG.isTraceEnabled()) LOG.trace("For {}, considering membership of {} which is in locations {}", + new Object[] {this, member, member.getLocations()}); + if (belongsInServerPool(member)) { + addServerPoolMember(member); + } else { + removeServerPoolMember(member); + } + if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", this, member); } - if (LOG.isTraceEnabled()) LOG.trace("Done {} checkEntity {}", this, member); } protected boolean belongsInServerPool(Entity member) { @@ -217,40 +313,51 @@ public abstract class AbstractNonProvisionedControllerImpl extends AbstractEntit return getAttribute(HOST_AND_PORT_SENSOR); } - protected synchronized void addServerPoolMember(Entity member) { - if (serverPoolTargets.containsKey(member)) { - if (LOG.isTraceEnabled()) LOG.trace("For {}, not adding as already have member {}", new Object[] {this, member}); - return; - } - - String address = getAddressOfEntity(member); - if (address != null) { - serverPoolAddresses.add(address); - } + protected void addServerPoolMember(Entity member) { + synchronized (mutex) { + String oldAddress = getAttribute(SERVER_POOL_TARGETS).get(member); + String newAddress = getAddressOfEntity(member); + if (Objects.equal(newAddress, oldAddress)) { + if (LOG.isTraceEnabled()) + if (LOG.isTraceEnabled()) LOG.trace("Ignoring unchanged address {}", oldAddress); + return; + } else if (newAddress == null) { + LOG.info("Removing from {}, member {} with old address {}, because inferred address is now null", new Object[] {this, member, oldAddress}); + } else { + if (oldAddress != null) { + LOG.info("Replacing in {}, member {} with old address {}, new address {}", new Object[] {this, member, oldAddress, newAddress}); + } else { + LOG.info("Adding to {}, new member {} with address {}", new Object[] {this, member, newAddress}); + } + } - LOG.info("Adding to {}, new member {} with address {}", new Object[] {this, member, address}); - - update(); - serverPoolTargets.put(member, address); - } - - protected synchronized void removeServerPoolMember(Entity member) { - if (!serverPoolTargets.containsKey(member)) { - if (LOG.isTraceEnabled()) LOG.trace("For {}, not removing as don't have member {}", new Object[] {this, member}); - return; + if (Objects.equal(oldAddress, newAddress)) { + if (LOG.isTraceEnabled()) LOG.trace("For {}, ignoring change in member {} because address still {}", new Object[] {this, member, newAddress}); + return; + } + + // TODO this does it synchronously; an async method leaning on `updateNeeded` and `update` might + // be more appropriate, especially when this is used in a listener + MapAttribute.put(this, SERVER_POOL_TARGETS, member, newAddress); + updateAsync(); } - - String address = serverPoolTargets.get(member); - if (address != null) { - serverPoolAddresses.remove(address); + } + + protected void removeServerPoolMember(Entity member) { + synchronized (mutex) { + if (!getAttribute(SERVER_POOL_TARGETS).containsKey(member)) { + if (LOG.isTraceEnabled()) LOG.trace("For {}, not removing as don't have member {}", new Object[] {this, member}); + return; + } + + String address = MapAttribute.remove(this, SERVER_POOL_TARGETS, member); + + LOG.info("Removing from {}, member {} with address {}", new Object[] {this, member, address}); + + updateAsync(); } - - LOG.info("Removing from {}, member {} with address {}", new Object[] {this, member, address}); - - update(); - serverPoolTargets.remove(member); } - + protected String getAddressOfEntity(Entity member) { AttributeSensor<String> hostAndPortSensor = getHostAndPortSensor(); if (hostAndPortSensor != null) {