Repository: brooklyn-server Updated Branches: refs/heads/master 55ec074f4 -> 5c5d578f4
BROOKLYN-425: softwareProcess.rebind execs with entity context Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/d9e4c6f3 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/d9e4c6f3 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/d9e4c6f3 Branch: refs/heads/master Commit: d9e4c6f3e7ffa6bd5c17c99bc6c6c9d4690b1ae6 Parents: 0570ca5 Author: Aled Sage <aled.s...@gmail.com> Authored: Thu Jan 12 18:16:49 2017 +0000 Committer: Aled Sage <aled.s...@gmail.com> Committed: Tue Jan 24 15:42:08 2017 +0000 ---------------------------------------------------------------------- .../entity/machine/AddMachineMetrics.java | 2 +- .../software/base/SoftwareProcessImpl.java | 42 +++--- .../machine/MachineEntityJcloudsRebindTest.java | 142 +++++++++++++++++++ .../entity/machine/MachineEntityRebindTest.java | 20 ++- 4 files changed, 179 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d9e4c6f3/software/base/src/main/java/org/apache/brooklyn/entity/machine/AddMachineMetrics.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/AddMachineMetrics.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/AddMachineMetrics.java index 5746f60..a89e479 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/machine/AddMachineMetrics.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/AddMachineMetrics.java @@ -88,6 +88,7 @@ public class AddMachineMetrics implements EntityInitializer { public static SshFeed createMachineMetricsFeed(EntityLocal entity) { boolean retrieveUsageMetrics = entity.config().get(SoftwareProcess.RETRIEVE_USAGE_METRICS); return SshFeed.builder() + .uniqueTag("machineMetricsFeed") .period(Duration.THIRTY_SECONDS) .entity(entity) .poll(SshPollConfig.forSensor(MachineAttributes.UPTIME) @@ -159,5 +160,4 @@ public class AddMachineMetrics implements EntityInitializer { })) .build(); } - } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d9e4c6f3/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java index 0d2661e..595be97 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/software/base/SoftwareProcessImpl.java @@ -18,25 +18,12 @@ */ package org.apache.brooklyn.entity.software.base; -import groovy.time.TimeDuration; - import java.util.Collection; import java.util.Map; import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import org.apache.brooklyn.location.jclouds.networking.NetworkingEffectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Functions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; - import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntityLocal; import org.apache.brooklyn.api.entity.drivers.DriverDependentEntity; @@ -60,17 +47,29 @@ import org.apache.brooklyn.core.location.LocationConfigKeys; import org.apache.brooklyn.core.location.cloud.CloudLocationConfig; import org.apache.brooklyn.feed.function.FunctionFeed; import org.apache.brooklyn.feed.function.FunctionPollConfig; +import org.apache.brooklyn.location.jclouds.networking.NetworkingEffectors; import org.apache.brooklyn.location.ssh.SshMachineLocation; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.config.ConfigBag; +import org.apache.brooklyn.util.core.task.BasicTask; import org.apache.brooklyn.util.core.task.DynamicTasks; +import org.apache.brooklyn.util.core.task.ScheduledTask; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.guava.Maybe; import org.apache.brooklyn.util.time.CountdownTimer; import org.apache.brooklyn.util.time.Duration; import org.apache.brooklyn.util.time.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Functions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +import groovy.time.TimeDuration; /** * An {@link Entity} representing a piece of software which can be installed, run, and controlled. @@ -389,21 +388,26 @@ public abstract class SoftwareProcessImpl extends AbstractEntity implements Soft } else { long delay = (long) (Math.random() * configuredMaxDelay.toMilliseconds()); LOG.debug("Scheduled reconnection of sensors on {} in {}ms", this, delay); - Timer timer = new Timer(); - timer.schedule(new TimerTask() { - @Override public void run() { + + // This is functionally equivalent to new scheduledExecutor.schedule(job, delay, TimeUnit.MILLISECONDS). + // It uses the entity's execution context to schedule and thus execute the job. + Map<?,?> flags = MutableMap.of("delay", Duration.millis(delay), "maxIterations", 1, "cancelOnException", true); + Callable<Void> job = new Callable<Void>() { + public Void call() { try { if (getManagementSupport().isNoLongerManaged()) { LOG.debug("Entity {} no longer managed; ignoring scheduled connect sensors on rebind", SoftwareProcessImpl.this); - return; + return null; } connectSensors(); } catch (Throwable e) { LOG.warn("Problem connecting sensors on rebind of "+SoftwareProcessImpl.this, e); Exceptions.propagateIfFatal(e); } - } - }, delay); + return null; + }}; + ScheduledTask scheduledTask = new ScheduledTask(flags, new BasicTask<Void>(job)); + getExecutionContext().submit(scheduledTask); } // don't wait here - it may be long-running, e.g. if remote entity has died, and we don't want to block rebind waiting or cause it to fail // the service will subsequently show service not up and thus failure http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d9e4c6f3/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityJcloudsRebindTest.java ---------------------------------------------------------------------- diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityJcloudsRebindTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityJcloudsRebindTest.java new file mode 100644 index 0000000..196a51c --- /dev/null +++ b/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityJcloudsRebindTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.machine; + +import static org.testng.Assert.fail; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.sensor.Feed; +import org.apache.brooklyn.core.entity.Attributes; +import org.apache.brooklyn.core.entity.EntityAsserts; +import org.apache.brooklyn.core.entity.EntityInternal; +import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; +import org.apache.brooklyn.location.jclouds.JcloudsLocation; +import org.apache.brooklyn.location.jclouds.JcloudsRebindStubUnitTest; +import org.apache.brooklyn.location.jclouds.StubbedComputeServiceRegistry; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool; +import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool.CustomResponse; +import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool.ExecCmd; +import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool.ExecParams; +import org.apache.brooklyn.util.time.Duration; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; + +public class MachineEntityJcloudsRebindTest extends JcloudsRebindStubUnitTest { + + //TODO To decrease noisy erroneous warnings, could also stub the response for + // "free | grep Mem" etc in initSshCustomResponses() + + @Override + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + super.setUp(); + initSshCustomResponses(); + } + + private void initSshCustomResponses() { + RecordingSshTool.setCustomResponse("cat /proc/uptime", new RecordingSshTool.CustomResponseGenerator() { + private final AtomicInteger counter = new AtomicInteger(1); + @Override + public CustomResponse generate(ExecParams execParams) throws Exception { + return new CustomResponse(0, Integer.toString(counter.getAndIncrement()), ""); + }}); + RecordingSshTool.setCustomResponse(".*/etc/os-release.*", new RecordingSshTool.CustomResponseGenerator() { + @Override + public CustomResponse generate(ExecParams execParams) throws Exception { + String stdout = Joiner.on("\n").join( + "name:centos", + "version:7.0", + "architecture:myarch", + "ram:1024", + "cpus:1"); + return new CustomResponse(0, stdout, ""); + }}); + } + + // See https://issues.apache.org/jira/browse/BROOKLYN-425 + @Test + @Override + public void testRebind() throws Exception { + this.nodeCreator = newNodeCreator(); + this.computeServiceRegistry = new StubbedComputeServiceRegistry(nodeCreator, false); + JcloudsLocation origJcloudsLoc = newJcloudsLocation(computeServiceRegistry); + + MachineEntity machine = origApp.createAndManageChild(EntitySpec.create(MachineEntity.class) + .configure(MachineEntity.MAXIMUM_REBIND_SENSOR_CONNECT_DELAY, Duration.millis(100))); + origApp.start(ImmutableList.of(origJcloudsLoc)); + EntityAsserts.assertAttributeEqualsEventually(machine, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + + RecordingSshTool.clear(); + initSshCustomResponses(); + rebind(); + + Entity newMachine = mgmt().getEntityManager().getEntity(machine.getId()); + EntityAsserts.assertAttributeEqualsEventually(newMachine, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); + + // Expect SshMachineLocation.inferMachineDetails to have successfully retrieved os details, + // which we've stubbed to return centos (in ssh call). + assertRecordedSshCmdContainsEventually("/etc/os-release"); + + // TODO Would like to assert that we have the feed, but it's not actually added to the entity! + // See AddMachineMetrics.createMachineMetricsFeed, which doesn't call `feeds().addFeed()` so + // it's not persisted and is not accessible from entity.feeds().getFeeds(). Instead, it just + // adds the entity to the feed (which is the old way, for if your feed is not persistable). + // assertHasFeedEventually(newMachine, "machineMetricsFeed"); + + // TODO AddMachineMetrics.createMachineMetricsFeed poll period is not configurable; + // we'd have to wait 30 seconds for a change. + // EntityAsserts.assertAttributeChangesEventually(newMachine, MachineAttributes.UPTIME); + } + + private void assertRecordedSshCmdContainsEventually(final String expected) { + Asserts.succeedsEventually(new Runnable() { + public void run() { + List<ExecCmd> cmds = RecordingSshTool.getExecCmds(); + for (ExecCmd cmd : cmds) { + if (cmd.commands.toString().contains(expected)) { + return; + } + } + fail("Commands (" + expected + ") not contain in " + cmds); + }}); + } + + @SuppressWarnings("unused") + private void assertHasFeedEventually(final Entity entity, final String uniqueTag) { + Asserts.succeedsEventually(new Runnable() { + public void run() { + Collection<Feed> feeds = ((EntityInternal)entity).feeds().getFeeds(); + for (Feed feed : feeds) { + if (uniqueTag.equals(feed.getUniqueTag())) { + return; + } + } + fail("No feed found with uniqueTag "+uniqueTag+" in entity "+entity+"; feeds="+feeds); + }}); + } +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/d9e4c6f3/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityRebindTest.java ---------------------------------------------------------------------- diff --git a/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityRebindTest.java b/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityRebindTest.java index 2480ee6..f9f9d41 100644 --- a/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityRebindTest.java +++ b/software/base/src/test/java/org/apache/brooklyn/entity/machine/MachineEntityRebindTest.java @@ -20,25 +20,31 @@ package org.apache.brooklyn.entity.machine; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.location.LocationSpec; import org.apache.brooklyn.core.entity.Attributes; import org.apache.brooklyn.core.entity.EntityAsserts; import org.apache.brooklyn.core.entity.lifecycle.Lifecycle; import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp; -import org.apache.brooklyn.entity.software.base.EmptySoftwareProcess; +import org.apache.brooklyn.location.ssh.SshMachineLocation; +import org.apache.brooklyn.util.core.internal.ssh.RecordingSshTool; import org.testng.annotations.Test; import com.google.common.collect.ImmutableList; public class MachineEntityRebindTest extends RebindTestFixtureWithApp { - @Test(groups = "Integration") + @Test public void testRebindToMachineEntity() throws Exception { - EmptySoftwareProcess machine = origApp.createAndManageChild(EntitySpec.create(EmptySoftwareProcess.class)); - origApp.start(ImmutableList.of(origManagementContext.getLocationRegistry().getLocationManaged("localhost"))); + SshMachineLocation loc = mgmt().getLocationManager().createLocation(LocationSpec.create(SshMachineLocation.class) + .configure("address", "localhost") + .configure(SshMachineLocation.SSH_TOOL_CLASS, RecordingSshTool.class.getName())); + MachineEntity machine = origApp.createAndManageChild(EntitySpec.create(MachineEntity.class)); + origApp.start(ImmutableList.of(loc)); EntityAsserts.assertAttributeEqualsEventually(machine, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); - rebind(false); - Entity machine2 = newManagementContext.getEntityManager().getEntity(machine.getId()); + + rebind(); + + Entity machine2 = mgmt().getEntityManager().getEntity(machine.getId()); EntityAsserts.assertAttributeEqualsEventually(machine2, Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING); } - }