Repository: ambari Updated Branches: refs/heads/trunk 77d613663 -> 58347de3d
AMBARI-6662. API required to provide temporal min/max/avg/sum/rate for a Flume metric (ncole) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/58347de3 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/58347de3 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/58347de3 Branch: refs/heads/trunk Commit: 58347de3d90bfe2d26bb71553d7931bd4f7d37d0 Parents: 77d6136 Author: Nate Cole <nc...@hortonworks.com> Authored: Tue Jul 29 14:58:54 2014 -0400 Committer: Nate Cole <nc...@hortonworks.com> Committed: Tue Jul 29 17:40:01 2014 -0400 ---------------------------------------------------------------------- .../main/python/ambari_agent/AmbariConfig.py | 2 +- .../nagios/NagiosPropertyProvider.java | 25 --- .../HDP/2.0.6/services/FLUME/metrics.json | 128 ++++++++++++++ .../services/FLUME/package/scripts/flume.py | 2 +- .../GANGLIA/package/templates/rrd.py.j2 | 172 +++++++++++++++++-- .../nagios/NagiosPropertyProviderTest.java | 18 -- .../server/upgrade/UpgradeCatalog160Test.java | 6 +- .../python/stacks/2.0.6/FLUME/test_flume.py | 6 +- 8 files changed, 291 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/58347de3/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py index b27097a..688dc74 100644 --- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py +++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py @@ -31,7 +31,7 @@ secured_url_port=8441 [agent] prefix=/tmp/ambari-agent -tmp_dir=/tmp/ambari-agent/tmp # For test purposes +tmp_dir=/tmp/ambari-agent/tmp data_cleanup_interval=86400 data_cleanup_max_age=2592000 ping_port=8670 http://git-wip-us.apache.org/repos/asf/ambari/blob/58347de3/ambari-server/src/main/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProvider.java index cc0f939..64d5e58 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProvider.java @@ -155,7 +155,6 @@ public class NagiosPropertyProvider extends BaseProvider implements PropertyProv } catch (Exception e) { LOG.error("Could not load Nagios alerts: " + e.getMessage()); } - alerts.addAll(convertAlerts(clusterName)); CLUSTER_ALERTS.put(clusterName, alerts); } } @@ -164,30 +163,6 @@ public class NagiosPropertyProvider extends BaseProvider implements PropertyProv /** - * Convert Alert from cluster to NagiosAlert - * @param clusterName the cluster name - * @return Collection of NagiosAlerts - * @throws AmbariException - */ - public List<NagiosAlert> convertAlerts(String clusterName) { - Cluster cluster; - try { - cluster = clusters.getCluster(clusterName); - } catch (AmbariException ex) { - return new ArrayList<NagiosAlert>(); - } - Collection<Alert> clusterAlerts = cluster.getAlerts(); - List<NagiosAlert> results = new ArrayList<NagiosAlert>(); - if (clusterAlerts != null) { - for (Alert alert : clusterAlerts) { - NagiosAlert a = new NagiosAlert(alert); - results.add(a); - } - } - return results; - } - - /** * Use only for testing to remove all cached alerts. */ public void forceReset() { http://git-wip-us.apache.org/repos/asf/ambari/blob/58347de3/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/metrics.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/metrics.json b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/metrics.json index c560d2a..2114c12 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/metrics.json +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/metrics.json @@ -288,7 +288,71 @@ "metric":"(\\w+).SOURCE.(\\w+).EventAcceptedCount", "pointInTime":true, "temporal":true + }, + + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/min": { + "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._min", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/max": { + "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._max", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/avg": { + "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._avg", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/sum": { + "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._sum", + "pointInTime":false, + "temporal":true + }, + + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/avg": { + "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._avg", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/max": { + "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._max", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/min": { + "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._min", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/sum": { + "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._sum", + "pointInTime":false, + "temporal":true + }, + + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/avg": { + "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._avg", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/max": { + "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._max", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/min": { + "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._min", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/sum": { + "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._sum", + "pointInTime":false, + "temporal":true } + } } ], @@ -580,7 +644,71 @@ "metric":"(\\w+).SOURCE.(\\w+).EventAcceptedCount", "pointInTime":true, "temporal":true + }, + + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/avg": { + "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._avg", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/max": { + "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._max", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/min": { + "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._min", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventTakeSuccessCount/rate/sum": { + "metric":"(\\w+).CHANNEL.(\\w+).EventTakeSuccessCount._rate._sum", + "pointInTime":false, + "temporal":true + }, + + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/avg": { + "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._avg", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/max": { + "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._max", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/min": { + "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._min", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")EventPutSuccessCount/rate/sum": { + "metric":"(\\w+).CHANNEL.(\\w+).EventPutSuccessCount._rate._sum", + "pointInTime":false, + "temporal":true + }, + + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/avg": { + "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._avg", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/max": { + "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._max", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/min": { + "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._min", + "pointInTime":false, + "temporal":true + }, + "metrics/flume/$1.substring(0)/CHANNEL/$2.replaceAll(\"[^-]+\",\"\")ChannelSize/sum": { + "metric":"(\\w+).CHANNEL.(\\w+).ChannelSize._sum", + "pointInTime":false, + "temporal":true } + } } ] http://git-wip-us.apache.org/repos/asf/ambari/blob/58347de3/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py index 1cbdf39..4016c79 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/FLUME/package/scripts/flume.py @@ -82,7 +82,7 @@ def flume(action = None): Execute(flume_cmd, wait_for_finish=False) # sometimes startup spawns a couple of threads - so only the first line may count - pid_cmd = format('pgrep -o -u {flume_user} -f ^{java_home} > {flume_agent_pid_file}') + pid_cmd = format('pgrep -o -u {flume_user} -f ^{java_home}.*{agent}.* > {flume_agent_pid_file}') Execute(pid_cmd, logoutput=True, tries=10, try_sleep=1) pass http://git-wip-us.apache.org/repos/asf/ambari/blob/58347de3/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/GANGLIA/package/templates/rrd.py.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/GANGLIA/package/templates/rrd.py.j2 b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/GANGLIA/package/templates/rrd.py.j2 index a3561e9..d4ded14 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/GANGLIA/package/templates/rrd.py.j2 +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/GANGLIA/package/templates/rrd.py.j2 @@ -19,22 +19,20 @@ limitations under the License. ''' import cgi +import glob import os +import re import rrdtool import sys import time -import re import urlparse # place this script in /var/www/cgi-bin of the Ganglia collector # requires 'yum install rrdtool-python' on the Ganglia collector - - -def printMetric(clusterName, hostName, metricName, file, cf, start, end, - resolution, pointInTime): - if clusterName.endswith("rrds"): - clusterName = "" - +''' + Loads rrd file info +''' +def loadRRDData(file, cf, start, end, resolution): args = [file, cf, "--daemon", "unix:{{ganglia_runtime_dir}}/rrdcached.limited.sock"] if start is not None: @@ -48,7 +46,139 @@ def printMetric(clusterName, hostName, metricName, file, cf, start, end, if resolution is not None: args.extend(["-r", resolution]) - rrdMetric = rrdtool.fetch(args) + return rrdtool.fetch(args) + +''' + Collects metrics across several matching filenames. +''' +def collectStatMetrics(clusterName, hostName, metricName, files, cf, start, end, resolution): + if clusterName[0] is not '/': + clusterName.insert(0, '/') + + metricParts = metricName.split('.') + + # already know there's at least one + metricStat = metricParts[-1] + metricName = '.'.join(metricParts[:-1]) + + isRate = False + if len(metricParts) > 1 and metricParts[-2] == '_rate': + isRate = True + metricName = '.'.join(metricParts[:-2]) + + pattern = re.compile(metricName + '\.rrd$') + matchedFiles = filter(pattern.match, files) + + parentPath = os.path.join(*clusterName) + + actualFiles = [] + for matchedFile in matchedFiles: + if hostName != "__SummaryInfo__": + osFiles = glob.glob(os.path.join(parentPath, hostName, matchedFile)) + else: + osFiles = glob.glob(os.path.join(parentPath, '*', matchedFile)) + + for f in osFiles: + if -1 == f.find("__SummaryInfo__"): + actualFiles.append(f) + + if len(actualFiles) == 0: + return + + ''' + [ + { + "step_value": update each iteration + "count": increase by 1 each iteration + "sum": increase by value each iteration + "avg": update each iteration as sum/count + "min": update each iteration if step_value < old min OR min is missing (first time) + "max": update each iteration if step_value > old max OR max is missing (first time) + } + ] + ''' + + timestamp = None + stepsize = None + concreteMetricName = None + vals = None # values across all files + + for file in actualFiles: + rrdMetric = loadRRDData(file, cf, start, end, resolution) + + if timestamp is None and stepsize is None and concreteMetricName is None: + timestamp = rrdMetric[0][0] + stepsize = rrdMetric[0][2] + suffix = metricStat if not isRate else '_rate.' + metricStat + concreteMetricName = file.split(os.sep).pop().replace('rrd', suffix) + + metricValues = rrdMetric[2] + + if vals is None: + vals = [None] * len(metricValues) + + i = 0 + for tuple in metricValues: + if vals[i] is None: + vals[i] = {} + vals[i]['count'] = 0 + vals[i]['_sum'] = 0 + vals[i]['_avg'] = 0 + vals[i]['_min'] = 0 + vals[i]['_max'] = 0 + + rawValue = tuple[0] + vals[i]['step_value'] = rawValue + if rawValue is None: + i += 1 + continue + + if isRate: + if 0 == i: + rawValue = 0.0 + elif vals[i-1]['step_value'] is None: + rawValue = 0.0 + else: + rawValue = (rawValue - vals[i-1]['step_value']) / stepsize + + vals[i]['count'] += 1 + vals[i]['_sum'] += rawValue + + vals[i]['_avg'] = vals[i]['_sum']/vals[i]['count'] + + if rawValue < vals[i]['_min']: + vals[i]['_min'] = rawValue + + if rawValue > vals[i]['_max']: + vals[i]['_max'] = rawValue + + i += 1 + + sys.stdout.write("sum\n") + sys.stdout.write(clusterName[len(clusterName)-1] + "\n") + sys.stdout.write(hostName + "\n") + sys.stdout.write(concreteMetricName + "\n") + sys.stdout.write(str(timestamp) + "\n") + sys.stdout.write(str(stepsize) + "\n") + + for val in vals: + if val['step_value'] is None: + sys.stdout.write("[~n]") + else: + sys.stdout.write(str(val[metricStat])) + sys.stdout.write("\n") + + sys.stdout.write("[~EOM]\n") + + return + +def printMetric(clusterName, hostName, metricName, file, cf, start, end, + resolution, pointInTime): + if clusterName.endswith("rrds"): + clusterName = "" + + rrdMetric = loadRRDData(file, cf, start, end, resolution) + # ds_name sys.stdout.write(rrdMetric[1][0]) sys.stdout.write("\n") @@ -198,14 +328,22 @@ for cluster in clusterParts: os.path.join(path, file), cf, start, end, resolution, pointInTime) else: - #Regex as metric name - metricRegex = metric + '\.rrd$' - p = re.compile(metricRegex) - matchedFiles = filter(p.match, files) - for matchedFile in matchedFiles: - printMetric(pathParts[-2], pathParts[-1], matchedFile[:-4], - os.path.join(path, matchedFile), cf, start, end, - resolution, pointInTime) + need_stats = False + parts = metric.split(".") + if len(parts) > 0 and parts[-1] in ['_min', '_max', '_avg', '_sum']: + need_stats = True + + if need_stats and not pointInTime: + collectStatMetrics(pathParts[:-1], pathParts[-1], metric, files, cf, start, end, resolution) + else: + #Regex as metric name + metricRegex = metric + '\.rrd$' + p = re.compile(metricRegex) + matchedFiles = filter(p.match, files) + for matchedFile in matchedFiles: + printMetric(pathParts[-2], pathParts[-1], matchedFile[:-4], + os.path.join(path, matchedFile), cf, start, end, + resolution, pointInTime) sys.stdout.write("[~EOF]\n") # write end time http://git-wip-us.apache.org/repos/asf/ambari/blob/58347de3/ambari-server/src/test/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProviderTest.java index cbd3262..0a0821a 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/nagios/NagiosPropertyProviderTest.java @@ -443,24 +443,6 @@ public class NagiosPropertyProviderTest { } @Test - public void testConvertAlerts() throws Exception { - Injector inj = Guice.createInjector(new GuiceModule()); - - Clusters clusters = inj.getInstance(Clusters.class); - Cluster cluster = createMock(Cluster.class); - expect(cluster.getAlerts()).andReturn(Collections.<Alert>emptySet()).anyTimes(); - expect(clusters.getCluster("c1")).andReturn(cluster); - replay(clusters, cluster); - TestStreamProvider streamProvider = new TestStreamProvider("nagios_alerts.txt"); - NagiosPropertyProvider npp = new NagiosPropertyProvider(Resource.Type.Service, - streamProvider, "ServiceInfo/cluster_name", "ServiceInfo/service_name"); - List<NagiosAlert> list = npp.convertAlerts("c1"); - Assert.assertNotNull(list); - Assert.assertEquals(0, list.size()); - } - - - @Test public void testNagiosServiceAlertsWithPassive() throws Exception { Injector inj = Guice.createInjector(new GuiceModule()); http://git-wip-us.apache.org/repos/asf/ambari/blob/58347de3/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog160Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog160Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog160Test.java index 1b2526c..515f8cd 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog160Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog160Test.java @@ -234,17 +234,17 @@ public class UpgradeCatalog160Test { assertNull(column.getDefaultValue()); assertTrue(column.isNullable()); } - + @Test public void testGetSourceVersion() { final DBAccessor dbAccessor = createNiceMock(DBAccessor.class); UpgradeCatalog upgradeCatalog = getUpgradeCatalog(dbAccessor); Assert.assertEquals("1.5.1", upgradeCatalog.getSourceVersion()); - } + } /** * Checks that the restart_require column was created correct when using a * non-Postgres DB (MySQL, Oracle, etc). - * + * * @param restartRequiredColumnCapture */ private void assertRestartRequiredColumn( http://git-wip-us.apache.org/repos/asf/ambari/blob/58347de3/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py index 1abd849..49462e8 100644 --- a/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py +++ b/ambari-server/src/test/python/stacks/2.0.6/FLUME/test_flume.py @@ -57,7 +57,7 @@ class TestFlumeHandler(RMFTestCase): '-Dflume.monitoring.hosts=c6401.ambari.apache.org:8655"'), wait_for_finish = False) - self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45 > /var/run/flume/a1.pid', + self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45.*a1.* > /var/run/flume/a1.pid', logoutput = True, tries = 10, try_sleep = 1) @@ -205,7 +205,7 @@ class TestFlumeHandler(RMFTestCase): '-Dflume.monitoring.hosts=c6401.ambari.apache.org:8655"'), wait_for_finish = False) - self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45 > /var/run/flume/b1.pid', + self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45.*b1.* > /var/run/flume/b1.pid', logoutput = True, tries = 10, try_sleep = 1) @@ -233,7 +233,7 @@ class TestFlumeHandler(RMFTestCase): '-Dflume.monitoring.hosts=c6401.ambari.apache.org:8655"'), wait_for_finish = False) - self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45 > /var/run/flume/b1.pid', + self.assertResourceCalled('Execute', 'pgrep -o -u flume -f ^/usr/jdk64/jdk1.7.0_45.*b1.* > /var/run/flume/b1.pid', logoutput = True, tries = 10, try_sleep = 1)