Author: jlowe
Date: Tue Jul 23 19:25:19 2013
New Revision: 1506230
URL: http://svn.apache.org/r1506230
Log:
svn merge -c 1506226 FIXES: MAPREDUCE-5356. Ability to refresh aggregated log
retention period and check interval. Contributed by Ashwin Shankar
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java?rev=1506230&r1=1506229&r2=1506230&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
Tue Jul 23 19:25:19 2013
@@ -41,6 +41,7 @@ public class AggregatedLogDeletionServic
private static final Log LOG =
LogFactory.getLog(AggregatedLogDeletionService.class);
private Timer timer = null;
+ private long checkIntervalMsecs;
static class LogDeletionTask extends TimerTask {
private Configuration conf;
@@ -133,37 +134,71 @@ public class AggregatedLogDeletionServic
@Override
protected void serviceStart() throws Exception {
+ scheduleLogDeletionTask();
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ stopTimer();
+ super.serviceStop();
+ }
+
+ private void setLogAggCheckIntervalMsecs(long retentionSecs) {
+ Configuration conf = getConfig();
+ checkIntervalMsecs = 1000 * conf
+ .getLong(
+ YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
+ if (checkIntervalMsecs <= 0) {
+ // when unspecified compute check interval as 1/10th of retention
+ checkIntervalMsecs = (retentionSecs * 1000) / 10;
+ }
+ }
+
+ public void refreshLogRetentionSettings() {
+ if (getServiceState() == STATE.STARTED) {
+ Configuration conf = createConf();
+ setConfig(conf);
+ stopTimer();
+ scheduleLogDeletionTask();
+ } else {
+ LOG.warn("Failed to execute refreshLogRetentionSettings : Aggregated Log
Deletion Service is not started");
+ }
+ }
+
+ private void scheduleLogDeletionTask() {
Configuration conf = getConfig();
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
- //Log aggregation is not enabled so don't bother
+ // Log aggregation is not enabled so don't bother
return;
}
- long retentionSecs =
conf.getLong(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
+ long retentionSecs = conf.getLong(
+ YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_SECONDS);
- if(retentionSecs < 0) {
- LOG.info("Log Aggregation deletion is disabled because retention is" +
- " too small (" + retentionSecs + ")");
+ if (retentionSecs < 0) {
+ LOG.info("Log Aggregation deletion is disabled because retention is"
+ + " too small (" + retentionSecs + ")");
return;
}
- long checkIntervalMsecs = 1000 * conf.getLong(
- YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
-
YarnConfiguration.DEFAULT_LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS);
- if (checkIntervalMsecs <= 0) {
- // when unspecified compute check interval as 1/10th of retention
- checkIntervalMsecs = (retentionSecs * 1000) / 10;
- }
+ setLogAggCheckIntervalMsecs(retentionSecs);
TimerTask task = new LogDeletionTask(conf, retentionSecs);
timer = new Timer();
timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
- super.serviceStart();
}
- @Override
- protected void serviceStop() throws Exception {
- if(timer != null) {
+ private void stopTimer() {
+ if (timer != null) {
timer.cancel();
}
- super.serviceStop();
+ }
+
+ public long getCheckIntervalMsecs() {
+ return checkIntervalMsecs;
+ }
+
+ protected Configuration createConf() {
+ return new Configuration();
}
}
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java?rev=1506230&r1=1506229&r2=1506230&view=diff
==============================================================================
---
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
(original)
+++
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
Tue Jul 23 19:25:19 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Before;
import org.junit.Test;
+import org.junit.Assert;
import static org.mockito.Mockito.*;
@@ -129,6 +130,99 @@ public class TestAggregatedLogDeletionSe
}
@Test
+ public void testRefreshLogRetentionSettings() throws IOException {
+ long now = System.currentTimeMillis();
+ //time before 2000 sec
+ long before2000Secs = now - (2000 * 1000);
+ //time before 50 sec
+ long before50Secs = now - (50 * 1000);
+ String root = "mockfs://foo/";
+ String remoteRootLogDir = root + "tmp/logs";
+ String suffix = "logs";
+ final Configuration conf = new Configuration();
+ conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+ conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
+ conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
+ conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+ "1");
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
+
+ Path rootPath = new Path(root);
+ FileSystem rootFs = rootPath.getFileSystem(conf);
+ FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
+
+ Path remoteRootLogPath = new Path(remoteRootLogDir);
+
+ Path userDir = new Path(remoteRootLogPath, "me");
+ FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
+ userDir);
+
+ when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
+ new FileStatus[] { userDirStatus });
+
+ Path userLogDir = new Path(userDir, suffix);
+
+ //Set time last modified of app1Dir directory and its files to
before2000Secs
+ Path app1Dir = new Path(userLogDir, "application_1_1");
+ FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
+ app1Dir);
+
+ //Set time last modified of app1Dir directory and its files to
before50Secs
+ Path app2Dir = new Path(userLogDir, "application_1_2");
+ FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
+ app2Dir);
+
+ when(mockFs.listStatus(userLogDir)).thenReturn(
+ new FileStatus[] { app1DirStatus, app2DirStatus });
+
+ Path app1Log1 = new Path(app1Dir, "host1");
+ FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
+ app1Log1);
+
+ when(mockFs.listStatus(app1Dir)).thenReturn(
+ new FileStatus[] { app1Log1Status });
+
+ Path app2Log1 = new Path(app2Dir, "host1");
+ FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
+ app2Log1);
+
+ when(mockFs.listStatus(app2Dir)).thenReturn(
+ new FileStatus[] { app2Log1Status });
+
+ AggregatedLogDeletionService deletionSvc = new
AggregatedLogDeletionService() {
+ @Override
+ protected Configuration createConf() {
+ return conf;
+ }
+ };
+
+ deletionSvc.init(conf);
+ deletionSvc.start();
+
+ //app1Dir would be deleted since its done above log retention period
+ verify(mockFs, timeout(10000)).delete(app1Dir, true);
+ //app2Dir is not expected to be deleted since its below the threshold
+ verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
+
+ //Now,lets change the confs
+ conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
+ conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+ "2");
+ //We have not called refreshLogSettings,hence don't expect to see the
changed conf values
+ Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
+
+ //refresh the log settings
+ deletionSvc.refreshLogRetentionSettings();
+
+ //Check interval time should reflect the new value
+ Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
+ //app2Dir should be deleted since it falls above the threshold
+ verify(mockFs, timeout(10000)).delete(app2Dir, true);
+ deletionSvc.stop();
+ }
+
+ @Test
public void testCheckInterval() throws Exception {
long RETENTION_SECS = 10 * 24 * 3600;
long now = System.currentTimeMillis();
@@ -176,7 +270,7 @@ public class TestAggregatedLogDeletionSe
new AggregatedLogDeletionService();
deletionSvc.init(conf);
deletionSvc.start();
-
+
verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
verify(mockFs, never()).delete(app1Dir, true);