Author: arp Date: Mon Sep 30 18:28:07 2013 New Revision: 1527697 URL: http://svn.apache.org/r1527697 Log: Merging r1526971 through r1527683 from trunk to branch HDFS-2832
Added: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java - copied unchanged from r1527683, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerNullStateStoreService.java - copied unchanged from r1527683, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerNullStateStoreService.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreService.java - copied unchanged from r1527683, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreService.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreServiceFactory.java - copied unchanged from r1527683, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreServiceFactory.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerMemStateStoreService.java - copied unchanged from r1527683, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerMemStateStoreService.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java - copied unchanged from r1527683, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJHSDelegationTokenSecretManager.java - copied unchanged from r1527683, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJHSDelegationTokenSecretManager.java Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/conf/ (props changed) hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskAttemptID.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskID.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed) hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1526971-1527683 Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt Mon Sep 30 18:28:07 2013 @@ -155,6 +155,8 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5411. Refresh size of loaded job cache on history server (Ashwin Shankar via jlowe) + MAPREDUCE-5332. Support token-preserving restart of history server (jlowe) + IMPROVEMENTS MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and @@ -232,6 +234,21 @@ Release 2.1.2 - UNRELEASED MAPREDUCE-5513. ConcurrentModificationException in JobControl (Robert Parker via jlowe) + MAPREDUCE-5531. Fix compat with hadoop-1 in mapreduce.(TaskID, + TaskAttemptID) by re-introducing missing constructors. (Robert Kanter via + acmurthy) + + MAPREDUCE-5545. org.apache.hadoop.mapred.TestTaskAttemptListenerImpl.testCommitWindow + times out (Robert Kanter via jlowe) + + MAPREDUCE-5529. Fix compat with hadoop-1 in mapred.TotalOrderPartitioner + by re-introducing (get,set)PartitionFile which takes in JobConf. (Robert + Kanter via acmurthy) + + MAPREDUCE-5538. Fixed MR AppMaster to send job-notification URL only after + the job is really done - a bug caused by MAPREDUCE-5505. (Zhijie Shen via + vinodkv) + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1526971-1527683 Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/conf/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1526971-1527683 Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Mon Sep 30 18:28:07 2013 @@ -531,19 +531,6 @@ public class MRAppMaster extends Composi // this is the only job, so shut down the Appmaster // note in a workflow scenario, this may lead to creation of a new // job (FIXME?) - // Send job-end notification - if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { - try { - LOG.info("Job end notification started for jobID : " - + job.getReport().getJobId()); - JobEndNotifier notifier = new JobEndNotifier(); - notifier.setConf(getConfig()); - notifier.notify(job.getReport()); - } catch (InterruptedException ie) { - LOG.warn("Job end notification interrupted for jobID : " - + job.getReport().getJobId(), ie); - } - } try { //if isLastAMRetry comes as true, should never set it to false @@ -559,10 +546,28 @@ public class MRAppMaster extends Composi LOG.info("Calling stop for all the services"); MRAppMaster.this.stop(); - // Except ClientService, other services are already stopped, it is safe to - // let clients know the final states. ClientService should wait for some - // time so clients have enough time to know the final states. - safeToReportTerminationToUser.set(true); + if (isLastAMRetry) { + // Except ClientService, other services are already stopped, it is safe to + // let clients know the final states. ClientService should wait for some + // time so clients have enough time to know the final states. + safeToReportTerminationToUser.set(true); + + // Send job-end notification when it is safe to report termination to + // users and it is the last AM retry + if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) { + try { + LOG.info("Job end notification started for jobID : " + + job.getReport().getJobId()); + JobEndNotifier notifier = new JobEndNotifier(); + notifier.setConf(getConfig()); + notifier.notify(job.getReport()); + } catch (InterruptedException ie) { + LOG.warn("Job end notification interrupted for jobID : " + + job.getReport().getJobId(), ie); + } + } + } + try { Thread.sleep(5000); } catch (InterruptedException e) { Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Sep 30 18:28:07 2013 @@ -128,6 +128,8 @@ import org.apache.hadoop.yarn.state.Stat import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; +import com.google.common.annotations.VisibleForTesting; + /** Implementation of Job interface. Maintains the state machines of Job. * The read and write calls use ReadWriteLock for concurrency. */ Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Mon Sep 30 18:28:07 2013 @@ -228,7 +228,7 @@ public class TestTaskAttemptListenerImpl return tce; } - @Test (timeout=1000) + @Test (timeout=10000) public void testCommitWindow() throws IOException { SystemClock clock = new SystemClock(); Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java Mon Sep 30 18:28:07 2013 @@ -18,19 +18,41 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; + +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintStream; import java.net.Proxy; +import java.net.URI; +import java.net.URISyntaxException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpServer; +import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; +import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; /** * Tests job end notification * */ +@SuppressWarnings("unchecked") public class TestJobEndNotifier extends JobEndNotifier { //Test maximum retries is capped by MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS @@ -133,7 +155,7 @@ public class TestJobEndNotifier extends public void testNotifyRetries() throws InterruptedException { Configuration conf = new Configuration(); conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent"); - JobReport jobReport = Mockito.mock(JobReport.class); + JobReport jobReport = mock(JobReport.class); long startTime = System.currentTimeMillis(); this.notificationCount = 0; @@ -162,4 +184,100 @@ public class TestJobEndNotifier extends } + @Test + public void testNotificationOnNormalShutdown() throws Exception { + HttpServer server = startHttpServer(); + // Act like it is the second attempt. Default max attempts is 2 + MRApp app = spy(new MRApp(2, 2, true, this.getClass().getName(), true, 2)); + // Make use of safeToReportflag so that we can look at final job-state as + // seen by real users. + app.safeToReportTerminationToUser.set(false); + doNothing().when(app).sysexit(); + Configuration conf = new Configuration(); + conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, + JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); + JobImpl job = (JobImpl)app.submit(conf); + // Even though auto-complete is true, because app is not shut-down yet, user + // will only see RUNNING state. + app.waitForInternalState(job, JobStateInternal.SUCCEEDED); + app.waitForState(job, JobState.RUNNING); + // Now shutdown. User should see SUCCEEDED state. + app.shutDownJob(); + app.waitForState(job, JobState.SUCCEEDED); + Assert.assertEquals(true, app.isLastAMRetry()); + Assert.assertEquals(1, JobEndServlet.calledTimes); + Assert.assertEquals("jobid=" + job.getID() + "&status=SUCCEEDED", + JobEndServlet.requestUri.getQuery()); + Assert.assertEquals(JobState.SUCCEEDED.toString(), + JobEndServlet.foundJobState); + server.stop(); + } + + @Test + public void testNotificationOnNonLastRetryShutdown() throws Exception { + HttpServer server = startHttpServer(); + MRApp app = spy(new MRApp(2, 2, false, this.getClass().getName(), true)); + doNothing().when(app).sysexit(); + // Make use of safeToReportflag so that we can look at final job-state as + // seen by real users. + app.safeToReportTerminationToUser.set(false); + Configuration conf = new Configuration(); + conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, + JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus"); + JobImpl job = (JobImpl)app.submit(new Configuration()); + app.waitForState(job, JobState.RUNNING); + app.getContext().getEventHandler() + .handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT)); + app.waitForInternalState(job, JobStateInternal.REBOOT); + // Not the last AM attempt. So user should that the job is still running. + app.waitForState(job, JobState.RUNNING); + app.shutDownJob(); + Assert.assertEquals(false, app.isLastAMRetry()); + Assert.assertEquals(0, JobEndServlet.calledTimes); + Assert.assertEquals(null, JobEndServlet.requestUri); + Assert.assertEquals(null, JobEndServlet.foundJobState); + server.stop(); + } + + private static HttpServer startHttpServer() throws Exception { + new File(System.getProperty( + "build.webapps", "build/webapps") + "/test").mkdirs(); + HttpServer server = new HttpServer.Builder().setName("test") + .setBindAddress("0.0.0.0").setPort(0).setFindPort(true).build(); + server.addServlet("jobend", "/jobend", JobEndServlet.class); + server.start(); + + JobEndServlet.calledTimes = 0; + JobEndServlet.requestUri = null; + JobEndServlet.baseUrl = "http://localhost:" + server.getPort() + "/"; + JobEndServlet.foundJobState = null; + return server; + } + + @SuppressWarnings("serial") + public static class JobEndServlet extends HttpServlet { + public static volatile int calledTimes = 0; + public static URI requestUri; + public static String baseUrl; + public static String foundJobState; + + @Override + public void doGet(HttpServletRequest request, HttpServletResponse response) + throws ServletException, IOException { + InputStreamReader in = new InputStreamReader(request.getInputStream()); + PrintStream out = new PrintStream(response.getOutputStream()); + + calledTimes++; + try { + requestUri = new URI(null, null, + request.getRequestURI(), request.getQueryString(), null); + foundJobState = request.getParameter("status"); + } catch (URISyntaxException e) { + } + + in.close(); + out.close(); + } + } + } Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Mon Sep 30 18:28:07 2013 @@ -164,6 +164,27 @@ public class JHAdminConfig { public static final String MR_HISTORY_STORAGE = MR_HISTORY_PREFIX + "store.class"; + /** + * Enable the history server to store server state and recover server state + * upon startup. + */ + public static final String MR_HS_RECOVERY_ENABLE = + MR_HISTORY_PREFIX + "recovery.enable"; + public static final boolean DEFAULT_MR_HS_RECOVERY_ENABLE = false; + + /** + * The HistoryServerStateStoreService class to store and recover server state + */ + public static final String MR_HS_STATE_STORE = + MR_HISTORY_PREFIX + "recovery.store.class"; + + /** + * The URI where server state will be stored when + * HistoryServerFileSystemStateStoreService is configured as the state store + */ + public static final String MR_HS_FS_STATE_STORE_URI = + MR_HISTORY_PREFIX + "recovery.store.fs.uri"; + /** Whether to use fixed ports with the minicluster. */ public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX + "minicluster.fixed.ports"; Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java Mon Sep 30 18:28:07 2013 @@ -21,7 +21,7 @@ package org.apache.hadoop.mapred.lib; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Partitioner; @@ -41,4 +41,30 @@ public class TotalOrderPartitioner<K ,V> super.setConf(job); } + /** + * Set the path to the SequenceFile storing the sorted partition keyset. + * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt> + * keys in the SequenceFile. + * @deprecated Use + * {@link #setPartitionFile(Configuration, Path)} + * instead + */ + @Deprecated + public static void setPartitionFile(JobConf job, Path p) { + org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner. + setPartitionFile(job, p); + } + + /** + * Get the path to the SequenceFile storing the sorted partition keyset. + * @see #setPartitionFile(JobConf,Path) + * @deprecated Use + * {@link #getPartitionFile(Configuration)} + * instead + */ + @Deprecated + public static String getPartitionFile(JobConf job) { + return org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner. + getPartitionFile(job); + } } Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskAttemptID.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskAttemptID.java?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskAttemptID.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskAttemptID.java Mon Sep 30 18:28:07 2013 @@ -76,6 +76,20 @@ public class TaskAttemptID extends org.a int taskId, int id) { this(new TaskID(jtIdentifier, jobId, type, taskId), id); } + + /** + * Constructs a TaskId object from given parts. + * @param jtIdentifier jobTracker identifier + * @param jobId job number + * @param isMap whether the tip is a map + * @param taskId taskId number + * @param id the task attempt number + */ + @Deprecated + public TaskAttemptID(String jtIdentifier, int jobId, boolean isMap, + int taskId, int id) { + this(new TaskID(jtIdentifier, jobId, isMap, taskId), id); + } public TaskAttemptID() { taskId = new TaskID(); Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskID.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskID.java?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskID.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskID.java Mon Sep 30 18:28:07 2013 @@ -91,6 +91,29 @@ public class TaskID extends org.apache.h public TaskID(String jtIdentifier, int jobId, TaskType type, int id) { this(new JobID(jtIdentifier, jobId), type, id); } + + /** + * Constructs a TaskID object from given {@link JobID}. + * @param jobId JobID that this tip belongs to + * @param isMap whether the tip is a map + * @param id the tip number + */ + @Deprecated + public TaskID(JobID jobId, boolean isMap, int id) { + this(jobId, isMap ? TaskType.MAP : TaskType.REDUCE, id); + } + + /** + * Constructs a TaskInProgressId object from given parts. + * @param jtIdentifier jobTracker identifier + * @param jobId job number + * @param isMap whether the tip is a map + * @param id the tip number + */ + @Deprecated + public TaskID(String jtIdentifier, int jobId, boolean isMap, int id) { + this(new JobID(jtIdentifier, jobId), isMap, id); + } public TaskID() { jobId = new JobID(); Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Mon Sep 30 18:28:07 2013 @@ -1181,4 +1181,28 @@ <description>ACL of who can be admin of the History server.</description> </property> +<property> + <name>mapreduce.jobhistory.recovery.enable</name> + <value>false</value> + <description>Enable the history server to store server state and recover + server state upon startup. If enabled then + mapreduce.jobhistory.recovery.store.class must be specified.</description> +</property> + +<property> + <name>mapreduce.jobhistory.recovery.store.class</name> + <value>org.apache.hadoop.mapreduce.v2.hs.HistoryServerFileSystemStateStoreService</value> + <description>The HistoryServerStateStoreService class to store history server + state for recovery.</description> +</property> + +<property> + <name>mapreduce.jobhistory.recovery.store.fs.uri</name> + <value>${hadoop.tmp.dir}/mapred/history/recoverystore</value> + <!--value>hdfs://localhost:9000/mapred/history/recoverystore</value--> + <description>The URI where history server state will be stored if + HistoryServerFileSystemStateStoreService is configured as the recovery + storage class.</description> +</property> + </configuration> Propchange: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1526971-1527683 Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java Mon Sep 30 18:28:07 2013 @@ -18,10 +18,17 @@ package org.apache.hadoop.mapreduce.v2.hs; +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.DelegationKey; /** * A MapReduce specific delegation token secret manager. @@ -33,6 +40,11 @@ import org.apache.hadoop.security.token. public class JHSDelegationTokenSecretManager extends AbstractDelegationTokenSecretManager<MRDelegationTokenIdentifier> { + private static final Log LOG = LogFactory.getLog( + JHSDelegationTokenSecretManager.class); + + private HistoryServerStateStoreService store; + /** * Create a secret manager * @param delegationKeyUpdateInterval the number of seconds for rolling new @@ -42,17 +54,94 @@ public class JHSDelegationTokenSecretMan * @param delegationTokenRenewInterval how often the tokens must be renewed * @param delegationTokenRemoverScanInterval how often the tokens are scanned * for expired tokens + * @param store history server state store for persisting state */ public JHSDelegationTokenSecretManager(long delegationKeyUpdateInterval, long delegationTokenMaxLifetime, long delegationTokenRenewInterval, - long delegationTokenRemoverScanInterval) { + long delegationTokenRemoverScanInterval, + HistoryServerStateStoreService store) { super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, delegationTokenRenewInterval, delegationTokenRemoverScanInterval); + this.store = store; } @Override public MRDelegationTokenIdentifier createIdentifier() { return new MRDelegationTokenIdentifier(); } + + @Override + protected void storeNewMasterKey(DelegationKey key) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing master key " + key.getKeyId()); + } + try { + store.storeTokenMasterKey(key); + } catch (IOException e) { + LOG.error("Unable to store master key " + key.getKeyId(), e); + } + } + + @Override + protected void removeStoredMasterKey(DelegationKey key) { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing master key " + key.getKeyId()); + } + try { + store.removeTokenMasterKey(key); + } catch (IOException e) { + LOG.error("Unable to remove master key " + key.getKeyId(), e); + } + } + + @Override + protected void storeNewToken(MRDelegationTokenIdentifier tokenId, + long renewDate) { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing token " + tokenId.getSequenceNumber()); + } + try { + store.storeToken(tokenId, renewDate); + } catch (IOException e) { + LOG.error("Unable to store token " + tokenId.getSequenceNumber(), e); + } + } + + @Override + protected void removeStoredToken(MRDelegationTokenIdentifier tokenId) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Storing token " + tokenId.getSequenceNumber()); + } + try { + store.removeToken(tokenId); + } catch (IOException e) { + LOG.error("Unable to remove token " + tokenId.getSequenceNumber(), e); + } + } + + @Override + protected void updateStoredToken(MRDelegationTokenIdentifier tokenId, + long renewDate) { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating token " + tokenId.getSequenceNumber()); + } + try { + store.updateToken(tokenId, renewDate); + } catch (IOException e) { + LOG.error("Unable to update token " + tokenId.getSequenceNumber(), e); + } + } + + public void recover(HistoryServerState state) throws IOException { + LOG.info("Recovering " + getClass().getSimpleName()); + for (DelegationKey key : state.tokenMasterKeyState) { + addKey(key); + } + for (Entry<MRDelegationTokenIdentifier, Long> entry : + state.tokenState.entrySet()) { + addPersistedDelegationToken(entry.getKey(), entry.getValue()); + } + } } Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Mon Sep 30 18:28:07 2013 @@ -28,11 +28,13 @@ import org.apache.hadoop.fs.CommonConfig import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil; +import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState; import org.apache.hadoop.mapreduce.v2.hs.server.HSAdminServer; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ShutdownHookManager; @@ -64,6 +66,46 @@ public class JobHistoryServer extends Co private JHSDelegationTokenSecretManager jhsDTSecretManager; private AggregatedLogDeletionService aggLogDelService; private HSAdminServer hsAdminServer; + private HistoryServerStateStoreService stateStore; + + // utility class to start and stop secret manager as part of service + // framework and implement state recovery for secret manager on startup + private class HistoryServerSecretManagerService + extends AbstractService { + + public HistoryServerSecretManagerService() { + super(HistoryServerSecretManagerService.class.getName()); + } + + @Override + protected void serviceStart() throws Exception { + boolean recoveryEnabled = getConfig().getBoolean( + JHAdminConfig.MR_HS_RECOVERY_ENABLE, + JHAdminConfig.DEFAULT_MR_HS_RECOVERY_ENABLE); + if (recoveryEnabled) { + assert stateStore.isInState(STATE.STARTED); + HistoryServerState state = stateStore.loadState(); + jhsDTSecretManager.recover(state); + } + + try { + jhsDTSecretManager.startThreads(); + } catch(IOException io) { + LOG.error("Error while starting the Secret Manager threads", io); + throw io; + } + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + if (jhsDTSecretManager != null) { + jhsDTSecretManager.stopThreads(); + } + super.serviceStop(); + } + } public JobHistoryServer() { super(JobHistoryServer.class.getName()); @@ -86,11 +128,14 @@ public class JobHistoryServer extends Co } jobHistoryService = new JobHistory(); historyContext = (HistoryContext)jobHistoryService; - this.jhsDTSecretManager = createJHSSecretManager(conf); + stateStore = createStateStore(conf); + this.jhsDTSecretManager = createJHSSecretManager(conf, stateStore); clientService = new HistoryClientService(historyContext, this.jhsDTSecretManager); aggLogDelService = new AggregatedLogDeletionService(); hsAdminServer = new HSAdminServer(aggLogDelService, jobHistoryService); + addService(stateStore); + addService(new HistoryServerSecretManagerService()); addService(jobHistoryService); addService(clientService); addService(aggLogDelService); @@ -99,7 +144,7 @@ public class JobHistoryServer extends Co } protected JHSDelegationTokenSecretManager createJHSSecretManager( - Configuration conf) { + Configuration conf, HistoryServerStateStoreService store) { long secretKeyInterval = conf.getLong(MRConfig.DELEGATION_KEY_UPDATE_INTERVAL_KEY, MRConfig.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); @@ -111,9 +156,14 @@ public class JobHistoryServer extends Co MRConfig.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); return new JHSDelegationTokenSecretManager(secretKeyInterval, - tokenMaxLifetime, tokenRenewInterval, 3600000); + tokenMaxLifetime, tokenRenewInterval, 3600000, store); } - + + protected HistoryServerStateStoreService createStateStore( + Configuration conf) { + return HistoryServerStateStoreServiceFactory.getStore(conf); + } + protected void doSecureLogin(Configuration conf) throws IOException { SecurityUtil.login(conf, JHAdminConfig.MR_HISTORY_KEYTAB, JHAdminConfig.MR_HISTORY_PRINCIPAL); @@ -123,20 +173,11 @@ public class JobHistoryServer extends Co protected void serviceStart() throws Exception { DefaultMetricsSystem.initialize("JobHistoryServer"); JvmMetrics.initSingleton("JobHistoryServer", null); - try { - jhsDTSecretManager.startThreads(); - } catch(IOException io) { - LOG.error("Error while starting the Secret Manager threads", io); - throw io; - } super.serviceStart(); } @Override protected void serviceStop() throws Exception { - if (jhsDTSecretManager != null) { - jhsDTSecretManager.stopThreads(); - } DefaultMetricsSystem.shutdown(); super.serviceStop(); } Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java Mon Sep 30 18:28:07 2013 @@ -72,7 +72,7 @@ public class TestJobHistoryServer { Configuration config = new Configuration(); historyServer.init(config); assertEquals(STATE.INITED, historyServer.getServiceState()); - assertEquals(4, historyServer.getServices().size()); + assertEquals(6, historyServer.getServices().size()); HistoryClientService historyService = historyServer.getClientService(); assertNotNull(historyServer.getClientService()); assertEquals(STATE.INITED, historyService.getServiceState()); Modified: hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java?rev=1527697&r1=1527696&r2=1527697&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java (original) +++ hadoop/common/branches/HDFS-2832/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java Mon Sep 30 18:28:07 2013 @@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService; import org.apache.hadoop.mapreduce.v2.hs.JHSDelegationTokenSecretManager; import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; @@ -87,10 +88,11 @@ public class TestJHSSecurity { // no keytab based login }; + @Override protected JHSDelegationTokenSecretManager createJHSSecretManager( - Configuration conf) { + Configuration conf, HistoryServerStateStoreService store) { return new JHSDelegationTokenSecretManager(initialInterval, - maxLifetime, renewInterval, 3600000); + maxLifetime, renewInterval, 3600000, store); } }; // final JobHistoryServer jobHistoryServer = jhServer;