YARN-7732. Support Generic AM Simulator from SynthGenerator. (Contributed by 
Young Chen via curino)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/84cea001
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/84cea001
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/84cea001

Branch: refs/heads/HDFS-12996
Commit: 84cea0011ffe510d24cf9f2952944f7a6fe622cf
Parents: 6f81cc0
Author: Carlo Curino <cur...@apache.org>
Authored: Tue Feb 20 17:00:34 2018 -0800
Committer: Carlo Curino <cur...@apache.org>
Committed: Tue Feb 20 17:00:34 2018 -0800

----------------------------------------------------------------------
 hadoop-tools/hadoop-sls/pom.xml                 |   2 +
 .../org/apache/hadoop/yarn/sls/SLSRunner.java   | 137 +++---
 .../hadoop/yarn/sls/appmaster/AMSimulator.java  |   2 +-
 .../yarn/sls/appmaster/MRAMSimulator.java       |   7 +-
 .../yarn/sls/appmaster/StreamAMSimulator.java   | 273 +++++++++++
 .../hadoop/yarn/sls/appmaster/package-info.java |  21 +
 .../hadoop/yarn/sls/synthetic/SynthJob.java     | 367 ++++++++------
 .../yarn/sls/synthetic/SynthJobClass.java       | 180 -------
 .../sls/synthetic/SynthTraceJobProducer.java    | 487 ++++++++++++++++---
 .../yarn/sls/synthetic/SynthWorkload.java       | 121 -----
 .../hadoop/yarn/sls/BaseSLSRunnerTest.java      |   2 +-
 .../hadoop/yarn/sls/TestSLSGenericSynth.java    |  76 +++
 .../hadoop/yarn/sls/TestSLSStreamAMSynth.java   |  76 +++
 .../hadoop/yarn/sls/TestSynthJobGeneration.java | 213 +++++++-
 .../yarn/sls/appmaster/TestAMSimulator.java     |   2 +-
 .../src/test/resources/sls-runner.xml           |   4 +
 .../hadoop-sls/src/test/resources/syn.json      |   2 +-
 .../src/test/resources/syn_generic.json         |  54 ++
 .../src/test/resources/syn_stream.json          |  46 ++
 19 files changed, 1430 insertions(+), 642 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml
index a7cb9b2..ef5ac54 100644
--- a/hadoop-tools/hadoop-sls/pom.xml
+++ b/hadoop-tools/hadoop-sls/pom.xml
@@ -133,6 +133,8 @@
             <exclude>src/test/resources/simulate.info.html.template</exclude>
             <exclude>src/test/resources/track.html.template</exclude>
             <exclude>src/test/resources/syn.json</exclude>
+            <exclude>src/test/resources/syn_generic.json</exclude>
+            <exclude>src/test/resources/syn_stream.json</exclude>
             <exclude>src/test/resources/inputsls.json</exclude>
             <exclude>src/test/resources/nodes.json</exclude>
             <exclude>src/test/resources/exit-invariants.txt</exclude>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
index 456602f..951c09d 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/SLSRunner.java
@@ -47,13 +47,11 @@ import 
org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.tools.rumen.JobTraceReader;
 import org.apache.hadoop.tools.rumen.LoggedJob;
 import org.apache.hadoop.tools.rumen.LoggedTask;
 import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
-import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -627,89 +625,66 @@ public class SLSRunner extends Configured implements Tool 
{
     localConf.set("fs.defaultFS", "file:///");
     long baselineTimeMS = 0;
 
-    try {
+    // if we use the nodeFile this could have been not initialized yet.
+    if (stjp == null) {
+      stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
+    }
 
-      // if we use the nodeFile this could have been not initialized yet.
-      if (stjp == null) {
-        stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
-      }
+    SynthJob job = null;
+    // we use stjp, a reference to the job producer instantiated during node
+    // creation
+    while ((job = (SynthJob) stjp.getNextJob()) != null) {
+      // only support MapReduce currently
+      String user = job.getUser();
+      String jobQueue = job.getQueueName();
+      String oldJobId = job.getJobID().toString();
+      long jobStartTimeMS = job.getSubmissionTime();
 
-      SynthJob job = null;
-      // we use stjp, a reference to the job producer instantiated during node
-      // creation
-      while ((job = (SynthJob) stjp.getNextJob()) != null) {
-        // only support MapReduce currently
-        String user = job.getUser();
-        String jobQueue = job.getQueueName();
-        String oldJobId = job.getJobID().toString();
-        long jobStartTimeMS = job.getSubmissionTime();
-
-        // CARLO: Finish time is only used for logging, omit for now
-        long jobFinishTimeMS = -1L;
-
-        if (baselineTimeMS == 0) {
-          baselineTimeMS = jobStartTimeMS;
-        }
-        jobStartTimeMS -= baselineTimeMS;
-        jobFinishTimeMS -= baselineTimeMS;
-        if (jobStartTimeMS < 0) {
-          LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
-          jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
-          jobStartTimeMS = 0;
-        }
-
-        increaseQueueAppNum(jobQueue);
-
-        List<ContainerSimulator> containerList =
-            new ArrayList<ContainerSimulator>();
-        ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
-        Random rand = new Random(stjp.getSeed());
-
-        // map tasks
-        for (int i = 0; i < job.getNumberMaps(); i++) {
-          TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.MAP, i, 0);
-          RMNode node =
-              nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
-                  .getNode();
-          String hostname = "/" + node.getRackName() + "/" + 
node.getHostName();
-          long containerLifeTime = tai.getRuntime();
-          Resource containerResource =
-              Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
-                  (int) tai.getTaskInfo().getTaskVCores());
-          containerList.add(new ContainerSimulator(containerResource,
-              containerLifeTime, hostname, DEFAULT_MAPPER_PRIORITY, "map"));
-        }
+      // CARLO: Finish time is only used for logging, omit for now
+      long jobFinishTimeMS = jobStartTimeMS + job.getDuration();
 
-        // reduce tasks
-        for (int i = 0; i < job.getNumberReduces(); i++) {
-          TaskAttemptInfo tai = job.getTaskAttemptInfo(TaskType.REDUCE, i, 0);
-          RMNode node =
-              nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
-                  .getNode();
-          String hostname = "/" + node.getRackName() + "/" + 
node.getHostName();
-          long containerLifeTime = tai.getRuntime();
-          Resource containerResource =
-              Resource.newInstance((int) tai.getTaskInfo().getTaskMemory(),
-                  (int) tai.getTaskInfo().getTaskVCores());
-          containerList.add(
-              new ContainerSimulator(containerResource, containerLifeTime,
-                  hostname, DEFAULT_REDUCER_PRIORITY, "reduce"));
-        }
+      if (baselineTimeMS == 0) {
+        baselineTimeMS = jobStartTimeMS;
+      }
+      jobStartTimeMS -= baselineTimeMS;
+      jobFinishTimeMS -= baselineTimeMS;
+      if (jobStartTimeMS < 0) {
+        LOG.warn("Warning: reset job {} start time to 0.", oldJobId);
+        jobFinishTimeMS = jobFinishTimeMS - jobStartTimeMS;
+        jobStartTimeMS = 0;
+      }
 
-        ReservationId reservationId = null;
+      increaseQueueAppNum(jobQueue);
+
+      List<ContainerSimulator> containerList =
+          new ArrayList<ContainerSimulator>();
+      ArrayList<NodeId> keyAsArray = new ArrayList<NodeId>(nmMap.keySet());
+      Random rand = new Random(stjp.getSeed());
+
+      for (SynthJob.SynthTask task : job.getTasks()) {
+        RMNode node = 
nmMap.get(keyAsArray.get(rand.nextInt(keyAsArray.size())))
+            .getNode();
+        String hostname = "/" + node.getRackName() + "/" + node.getHostName();
+        long containerLifeTime = task.getTime();
+        Resource containerResource = Resource
+            .newInstance((int) task.getMemory(), (int) task.getVcores());
+        containerList.add(
+            new ContainerSimulator(containerResource, containerLifeTime,
+                hostname, task.getPriority(), task.getType()));
+      }
 
-        if (job.hasDeadline()) {
-          reservationId =
-              ReservationId.newInstance(this.rm.getStartTime(), AM_ID);
-        }
 
-        runNewAM(SLSUtils.DEFAULT_JOB_TYPE, user, jobQueue, oldJobId,
-            jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
-            job.getDeadline(), getAMContainerResource(null));
+      ReservationId reservationId = null;
 
+      if(job.hasDeadline()){
+        reservationId = ReservationId
+            .newInstance(this.rm.getStartTime(), AM_ID);
       }
-    } finally {
-      stjp.close();
+
+      runNewAM(job.getType(), user, jobQueue, oldJobId,
+          jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
+          job.getDeadline(), getAMContainerResource(null),
+          job.getParams());
     }
   }
 
@@ -753,14 +728,14 @@ public class SLSRunner extends Configured implements Tool 
{
       Resource amContainerResource) {
     runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
         jobFinishTimeMS, containerList, null,  -1,
-        amContainerResource);
+        amContainerResource, null);
   }
 
   private void runNewAM(String jobType, String user,
       String jobQueue, String oldJobId, long jobStartTimeMS,
       long jobFinishTimeMS, List<ContainerSimulator> containerList,
-      ReservationId reservationId, long deadline,
-      Resource amContainerResource) {
+      ReservationId reservationId, long deadline, Resource amContainerResource,
+      Map<String, String> params) {
 
     AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
         amClassMap.get(jobType), new Configuration());
@@ -777,7 +752,7 @@ public class SLSRunner extends Configured implements Tool {
       AM_ID++;
       amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
           jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
-          runner.getStartTimeMS(), amContainerResource);
+          runner.getStartTimeMS(), amContainerResource, params);
       if(reservationId != null) {
         // if we have a ReservationId, delegate reservation creation to
         // AMSim (reservation shape is impl specific)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 5727b5f..bf85fff 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -121,7 +121,7 @@ public abstract class AMSimulator extends TaskRunner.Task {
       List<ContainerSimulator> containerList, ResourceManager resourceManager,
       SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
       String simQueue, boolean tracked, String oldApp, long baseTimeMS,
-      Resource amResource) {
+      Resource amResource, Map<String, String> params) {
     super.init(startTime, startTime + 1000000L * heartbeatInterval,
         heartbeatInterval);
     this.user = simUser;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index 18a155c..6f0f85f 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -65,6 +65,9 @@ public class MRAMSimulator extends AMSimulator {
   scheduled when all maps have finished (not support slow-start currently).
   */
 
+  public static final String MAP_TYPE = "map";
+  public static final String REDUCE_TYPE = "reduce";
+
   private static final int PRIORITY_REDUCE = 10;
   private static final int PRIORITY_MAP = 20;
 
@@ -123,10 +126,10 @@ public class MRAMSimulator extends AMSimulator {
       List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
       long traceStartTime, long traceFinishTime, String user, String queue,
       boolean isTracked, String oldAppId, long baselineStartTimeMS,
-      Resource amContainerResource) {
+      Resource amContainerResource, Map<String, String> params) {
     super.init(heartbeatInterval, containerList, rm, se,
         traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
-        baselineStartTimeMS, amContainerResource);
+        baselineStartTimeMS, amContainerResource, params);
     amtype = "mapreduce";
 
     // get map/reduce tasks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
new file mode 100644
index 0000000..b41f5f2
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls.appmaster;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * AMSimulator that simulates streaming services - it keeps tasks
+ * running and resubmits them whenever they fail or complete. It finishes
+ * when the specified duration expires.
+ */
+
+@Private
+@Unstable
+public class StreamAMSimulator extends AMSimulator {
+  /*
+  Vocabulary Used:
+  pending -> requests which are NOT yet sent to RM
+  scheduled -> requests which are sent to RM but not yet assigned
+  assigned -> requests which are assigned to a container
+  completed -> request corresponding to which container has completed
+
+  streams are constantly scheduled. If a streaming job is killed, we restart it
+  */
+
+  private static final int PRIORITY_MAP = 20;
+
+  // pending streams
+  private LinkedList<ContainerSimulator> pendingStreams =
+          new LinkedList<>();
+
+  // scheduled streams
+  private LinkedList<ContainerSimulator> scheduledStreams =
+          new LinkedList<ContainerSimulator>();
+
+  // assigned streams
+  private Map<ContainerId, ContainerSimulator> assignedStreams =
+          new HashMap<ContainerId, ContainerSimulator>();
+
+  // all streams
+  private LinkedList<ContainerSimulator> allStreams =
+          new LinkedList<ContainerSimulator>();
+
+  // finished
+  private boolean isFinished = false;
+  private long duration = 0;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StreamAMSimulator.class);
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  public void init(int heartbeatInterval,
+      List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
+      long traceStartTime, long traceFinishTime, String user, String queue,
+      boolean isTracked, String oldAppId, long baselineStartTimeMS,
+      Resource amContainerResource, Map<String, String> params) {
+    super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
+        traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
+        amContainerResource, params);
+    amtype = "stream";
+
+    allStreams.addAll(containerList);
+
+    duration = traceFinishTime - traceStartTime;
+
+    LOG.info("Added new job with {} streams, running for {}",
+        allStreams.size(), duration);
+  }
+
+  @Override
+  public synchronized void notifyAMContainerLaunched(Container masterContainer)
+      throws Exception {
+    if (null != masterContainer) {
+      restart();
+      super.notifyAMContainerLaunched(masterContainer);
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected void processResponseQueue() throws Exception {
+    while (!responseQueue.isEmpty()) {
+      AllocateResponse response = responseQueue.take();
+
+      // check completed containers
+      if (!response.getCompletedContainersStatuses().isEmpty()) {
+        for (ContainerStatus cs : response.getCompletedContainersStatuses()) {
+          ContainerId containerId = cs.getContainerId();
+          if(assignedStreams.containsKey(containerId)){
+            // One of our containers completed. Regardless of reason,
+            // we want to maintain our streaming process
+            LOG.debug("Application {} has one streamer finished ({}).", appId,
+                containerId);
+            pendingStreams.add(assignedStreams.remove(containerId));
+          } else if (amContainer.getId().equals(containerId)){
+            // Our am container completed
+            if(cs.getExitStatus() == ContainerExitStatus.SUCCESS){
+              // am container released event (am container completed on 
success)
+              isAMContainerRunning = false;
+              isFinished = true;
+              LOG.info("Application {} goes to finish.", appId);
+            } else {
+              // am container killed - wait for re allocation
+              LOG.info("Application {}'s AM is "
+                  + "going to be killed. Waiting for rescheduling...", appId);
+              isAMContainerRunning = false;
+            }
+          }
+        }
+      }
+
+      // check finished
+      if (isAMContainerRunning &&
+          (System.currentTimeMillis() - simulateStartTimeMS >= duration)) {
+        LOG.debug("Application {} sends out event to clean up"
+                + " its AM container.", appId);
+        isAMContainerRunning = false;
+        isFinished = true;
+        break;
+      }
+
+      // check allocated containers
+      for (Container container : response.getAllocatedContainers()) {
+        if (!scheduledStreams.isEmpty()) {
+          ContainerSimulator cs = scheduledStreams.remove();
+          LOG.debug("Application {} starts to launch a stream ({}).", appId,
+              container.getId());
+          assignedStreams.put(container.getId(), cs);
+          se.getNmMap().get(container.getNodeId()).addNewContainer(container,
+              cs.getLifeTime());
+        }
+      }
+    }
+  }
+
+  /**
+   * restart running because of the am container killed.
+   */
+  private void restart()
+          throws YarnException, IOException, InterruptedException {
+    // clear
+    isFinished = false;
+    pendingStreams.clear();
+    pendingStreams.addAll(allStreams);
+
+    amContainer = null;
+  }
+
+  private List<ContainerSimulator> mergeLists(List<ContainerSimulator> left,
+      List<ContainerSimulator> right) {
+    List<ContainerSimulator> list = new ArrayList<>();
+    list.addAll(left);
+    list.addAll(right);
+    return list;
+  }
+
+  @Override
+  protected void sendContainerRequest()
+          throws YarnException, IOException, InterruptedException {
+
+    // send out request
+    List<ResourceRequest> ask = new ArrayList<>();
+    List<ContainerId> release = new ArrayList<>();
+    if (!isFinished) {
+      if (!pendingStreams.isEmpty()) {
+        ask = packageRequests(mergeLists(pendingStreams, scheduledStreams),
+            PRIORITY_MAP);
+        LOG.debug("Application {} sends out request for {} streams.",
+            appId, pendingStreams.size());
+        scheduledStreams.addAll(pendingStreams);
+        pendingStreams.clear();
+      }
+    }
+
+    if(isFinished){
+      release.addAll(assignedStreams.keySet());
+      ask.clear();
+    }
+
+    final AllocateRequest request = createAllocateRequest(ask, release);
+    if (totalContainers == 0) {
+      request.setProgress(1.0f);
+    } else {
+      request.setProgress((float) finishedContainers / totalContainers);
+    }
+
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser(appAttemptId.toString());
+    Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
+        .get(appAttemptId.getApplicationId())
+        .getRMAppAttempt(appAttemptId).getAMRMToken();
+    ugi.addTokenIdentifier(token.decodeIdentifier());
+    AllocateResponse response = ugi.doAs(
+        new PrivilegedExceptionAction<AllocateResponse>() {
+          @Override
+          public AllocateResponse run() throws Exception {
+            return rm.getApplicationMasterService().allocate(request);
+          }
+        });
+    if (response != null) {
+      responseQueue.put(response);
+    }
+  }
+
+  @Override
+  public void initReservation(
+      ReservationId reservationId, long deadline, long now){
+    // Streaming AM currently doesn't do reservations
+    setReservationRequest(null);
+  }
+
+  @Override
+  protected void checkStop() {
+    if (isFinished) {
+      super.setEndTime(System.currentTimeMillis());
+    }
+  }
+
+  @Override
+  public void lastStep() throws Exception {
+    super.lastStep();
+
+    // clear data structures
+    allStreams.clear();
+    assignedStreams.clear();
+    pendingStreams.clear();
+    scheduledStreams.clear();
+    responseQueue.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java
new file mode 100644
index 0000000..ead315b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Application Master simulators for the SLS.
+ */
+package org.apache.hadoop.yarn.sls.appmaster;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
index 3ed81e1..27156c7 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
@@ -19,19 +19,25 @@ package org.apache.hadoop.yarn.sls.synthetic;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.math3.distribution.LogNormalDistribution;
 import org.apache.commons.math3.random.JDKRandomGenerator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskStatus.State;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.tools.rumen.*;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.MapTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.ReduceTaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskAttemptInfo;
+import org.apache.hadoop.tools.rumen.TaskInfo;
 import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator;
 
-import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -46,6 +52,9 @@ public class SynthJob implements JobStory {
   @SuppressWarnings("StaticVariableName")
   private static Log LOG = LogFactory.getLog(SynthJob.class);
 
+  private static final long MIN_MEMORY = 1024;
+  private static final long MIN_VCORES = 1;
+
   private final Configuration conf;
   private final int id;
 
@@ -53,75 +62,93 @@ public class SynthJob implements JobStory {
   private static final AtomicInteger sequence = new AtomicInteger(0);
   private final String name;
   private final String queueName;
-  private final SynthJobClass jobClass;
+  private final SynthTraceJobProducer.JobDefinition jobDef;
+
+  private String type;
 
   // job timing
   private final long submitTime;
   private final long duration;
   private final long deadline;
 
-  private final int numMapTasks;
-  private final int numRedTasks;
-  private final long mapMaxMemory;
-  private final long reduceMaxMemory;
-  private final long mapMaxVcores;
-  private final long reduceMaxVcores;
-  private final long[] mapRuntime;
-  private final float[] reduceRuntime;
-  private long totMapRuntime;
-  private long totRedRuntime;
+  private Map<String, String> params;
+
+  private long totalSlotTime = 0;
+
+  // task information
+  private List<SynthTask> tasks = new ArrayList<>();
+  private Map<String, List<SynthTask>> taskByType = new HashMap<>();
+  private Map<String, Integer> taskCounts = new HashMap<>();
+  private Map<String, Long> taskMemory = new HashMap<>();
+  private Map<String, Long> taskVcores = new HashMap<>();
+
+  /**
+   * Nested class used to represent a task instance in a job. Each task
+   * corresponds to one container allocation for the job.
+   */
+  public static final class SynthTask{
+    private String type;
+    private long time;
+    private long maxMemory;
+    private long maxVcores;
+    private int priority;
+
+    private SynthTask(String type, long time, long maxMemory, long maxVcores,
+        int priority){
+      this.type = type;
+      this.time = time;
+      this.maxMemory = maxMemory;
+      this.maxVcores = maxVcores;
+      this.priority = priority;
+    }
+
+    public String getType(){
+      return type;
+    }
 
-  public SynthJob(JDKRandomGenerator rand, Configuration conf,
-      SynthJobClass jobClass, long actualSubmissionTime) {
+    public long getTime(){
+      return time;
+    }
 
-    this.conf = conf;
-    this.jobClass = jobClass;
-
-    this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS);
-    this.numMapTasks = jobClass.getMtasks();
-    this.numRedTasks = jobClass.getRtasks();
-
-    // sample memory distributions, correct for sub-minAlloc sizes
-    long tempMapMaxMemory = jobClass.getMapMaxMemory();
-    this.mapMaxMemory = tempMapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB
-        ? MRJobConfig.DEFAULT_MAP_MEMORY_MB : tempMapMaxMemory;
-    long tempReduceMaxMemory = jobClass.getReduceMaxMemory();
-    this.reduceMaxMemory =
-            tempReduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB
-            ? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : tempReduceMaxMemory;
-
-    // sample vcores distributions, correct for sub-minAlloc sizes
-    long tempMapMaxVCores = jobClass.getMapMaxVcores();
-    this.mapMaxVcores = tempMapMaxVCores < MRJobConfig.DEFAULT_MAP_CPU_VCORES
-        ? MRJobConfig.DEFAULT_MAP_CPU_VCORES : tempMapMaxVCores;
-    long tempReduceMaxVcores = jobClass.getReduceMaxVcores();
-    this.reduceMaxVcores =
-        tempReduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES
-            ? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : tempReduceMaxVcores;
-
-    if (numMapTasks > 0) {
-      conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory);
-      conf.set(MRJobConfig.MAP_JAVA_OPTS,
-          "-Xmx" + (this.mapMaxMemory - 100) + "m");
+    public long getMemory(){
+      return maxMemory;
     }
 
-    if (numRedTasks > 0) {
-      conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory);
-      conf.set(MRJobConfig.REDUCE_JAVA_OPTS,
-          "-Xmx" + (this.reduceMaxMemory - 100) + "m");
+    public long getVcores(){
+      return maxVcores;
     }
 
-    boolean hasDeadline =
-        (rand.nextDouble() <= jobClass.jobClass.chance_of_reservation);
+    public int getPriority(){
+      return priority;
+    }
+
+    @Override
+    public String toString(){
+      return String.format("[task]\ttype: %1$-10s\ttime: %2$3s\tmemory: "
+              + "%3$4s\tvcores: %4$2s%n", getType(), getTime(), getMemory(),
+          getVcores());
+    }
+  }
 
-    LogNormalDistribution deadlineFactor =
-        SynthUtils.getLogNormalDist(rand, 
jobClass.jobClass.deadline_factor_avg,
-            jobClass.jobClass.deadline_factor_stddev);
 
-    double deadlineFactorSample =
-        (deadlineFactor != null) ? deadlineFactor.sample() : -1;
+  protected SynthJob(JDKRandomGenerator rand, Configuration conf,
+      SynthTraceJobProducer.JobDefinition jobDef,
+      String queue, long actualSubmissionTime) {
 
-    this.queueName = jobClass.workload.getQueueName();
+    this.conf = conf;
+    this.jobDef = jobDef;
+
+    this.queueName = queue;
+
+    this.duration = MILLISECONDS.convert(jobDef.duration.getInt(),
+        SECONDS);
+
+    boolean hasDeadline =
+        (rand.nextDouble() <= jobDef.reservation.getDouble());
+
+    double deadlineFactorSample = jobDef.deadline_factor.getDouble();
+
+    this.type = jobDef.type;
 
     this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS);
 
@@ -129,6 +156,8 @@ public class SynthJob implements JobStory {
         hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS)
             + (long) Math.ceil(deadlineFactorSample * duration) : -1;
 
+    this.params = jobDef.params;
+
     conf.set(QUEUE_NAME, queueName);
 
     // name and initialize job randomness
@@ -136,79 +165,166 @@ public class SynthJob implements JobStory {
     rand.setSeed(seed);
     id = sequence.getAndIncrement();
 
-    name = String.format(jobClass.getClassName() + "_%06d", id);
+    name = String.format(jobDef.class_name + "_%06d", id);
     LOG.debug(name + " (" + seed + ")");
 
     LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime
         + " deadline:" + deadline + " duration:" + duration
         + " deadline-submission: " + (deadline - submitTime));
 
-    // generate map and reduce runtimes
-    mapRuntime = new long[numMapTasks];
-    for (int i = 0; i < numMapTasks; i++) {
-      mapRuntime[i] = jobClass.getMapTimeSample();
-      totMapRuntime += mapRuntime[i];
-    }
-    reduceRuntime = new float[numRedTasks];
-    for (int i = 0; i < numRedTasks; i++) {
-      reduceRuntime[i] = jobClass.getReduceTimeSample();
-      totRedRuntime += (long) Math.ceil(reduceRuntime[i]);
+    // Expand tasks
+    for(SynthTraceJobProducer.TaskDefinition task : jobDef.tasks){
+      int num = task.count.getInt();
+      String taskType = task.type;
+      long memory = task.max_memory.getLong();
+      memory = memory < MIN_MEMORY ? MIN_MEMORY: memory;
+      long vcores = task.max_vcores.getLong();
+      vcores = vcores < MIN_VCORES ? MIN_VCORES  : vcores;
+      int priority = task.priority;
+
+      // Save task information by type
+      taskByType.put(taskType, new ArrayList<>());
+      taskCounts.put(taskType, num);
+      taskMemory.put(taskType, memory);
+      taskVcores.put(taskType, vcores);
+
+      for(int i = 0; i < num; ++i){
+        long time = task.time.getLong();
+        totalSlotTime += time;
+        SynthTask t = new SynthTask(taskType, time, memory, vcores,
+            priority);
+        tasks.add(t);
+        taskByType.get(taskType).add(t);
+      }
     }
+
+  }
+
+  public String getType(){
+    return type;
+  }
+
+  public List<SynthTask> getTasks(){
+    return tasks;
   }
 
   public boolean hasDeadline() {
     return deadline > 0;
   }
 
-  @Override
   public String getName() {
     return name;
   }
 
-  @Override
   public String getUser() {
-    return jobClass.getUserName();
+    return jobDef.user_name;
   }
 
-  @Override
   public JobID getJobID() {
     return new JobID("job_mock_" + name, id);
   }
 
+  public long getSubmissionTime() {
+    return submitTime;
+  }
+
+  public String getQueueName() {
+    return queueName;
+  }
+
   @Override
-  public Values getOutcome() {
-    return Values.SUCCESS;
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    String res = "\nSynthJob [" + jobDef.class_name + "]: \n"
+        + "\tname: " + getName() + "\n"
+        + "\ttype: " + getType() + "\n"
+        + "\tid: " + id + "\n"
+        + "\tqueue: " + getQueueName() + "\n"
+        + "\tsubmission: " + getSubmissionTime() + "\n"
+        + "\tduration: " + getDuration() + "\n"
+        + "\tdeadline: " + getDeadline() + "\n";
+    sb.append(res);
+    int taskno = 0;
+    for(SynthJob.SynthTask t : getTasks()){
+      sb.append("\t");
+      sb.append(taskno);
+      sb.append(": \t");
+      sb.append(t.toString());
+      taskno++;
+    }
+    return sb.toString();
+  }
+
+  public long getTotalSlotTime() {
+    return totalSlotTime;
+  }
+
+  public long getDuration() {
+    return duration;
+  }
+
+  public long getDeadline() {
+    return deadline;
+  }
+
+  public Map<String, String> getParams() {
+    return params;
   }
 
   @Override
-  public long getSubmissionTime() {
-    return submitTime;
+  public boolean equals(Object other) {
+    if (!(other instanceof SynthJob)) {
+      return false;
+    }
+    SynthJob o = (SynthJob) other;
+    return tasks.equals(o.tasks)
+        && submitTime == o.submitTime
+        && type.equals(o.type)
+        && queueName.equals(o.queueName)
+        && jobDef.class_name.equals(o.jobDef.class_name);
+  }
+
+  @Override
+  public int hashCode() {
+    return jobDef.class_name.hashCode()
+        * (int) submitTime * (int) duration;
+  }
+
+
+  @Override
+  public JobConf getJobConf() {
+    return new JobConf(conf);
   }
 
   @Override
   public int getNumberMaps() {
-    return numMapTasks;
+    return taskCounts.get(MRAMSimulator.MAP_TYPE);
   }
 
   @Override
   public int getNumberReduces() {
-    return numRedTasks;
+    return taskCounts.get(MRAMSimulator.REDUCE_TYPE);
+  }
+
+  @Override
+  public InputSplit[] getInputSplits() {
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
-    switch (taskType) {
+    switch(taskType){
     case MAP:
-      return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores);
+      return new TaskInfo(-1, -1, -1, -1,
+          taskMemory.get(MRAMSimulator.MAP_TYPE),
+          taskVcores.get(MRAMSimulator.MAP_TYPE));
     case REDUCE:
-      return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores);
+      return new TaskInfo(-1, -1, -1, -1,
+          taskMemory.get(MRAMSimulator.REDUCE_TYPE),
+          taskVcores.get(MRAMSimulator.REDUCE_TYPE));
     default:
-      throw new IllegalArgumentException("Not interested");
+      break;
     }
-  }
-
-  @Override
-  public InputSplit[] getInputSplits() {
     throw new UnsupportedOperationException();
   }
 
@@ -218,17 +334,20 @@ public class SynthJob implements JobStory {
     switch (taskType) {
     case MAP:
       return new MapTaskAttemptInfo(State.SUCCEEDED,
-          getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null);
-
+          getTaskInfo(taskType, taskNumber),
+          taskByType.get(MRAMSimulator.MAP_TYPE).get(taskNumber).time,
+          null);
     case REDUCE:
       // We assume uniform split between pull/sort/reduce
       // aligned with naive progress reporting assumptions
       return new ReduceTaskAttemptInfo(State.SUCCEEDED,
           getTaskInfo(taskType, taskNumber),
-          (long) Math.round((reduceRuntime[taskNumber] / 3)),
-          (long) Math.round((reduceRuntime[taskNumber] / 3)),
-          (long) Math.round((reduceRuntime[taskNumber] / 3)), null);
-
+          taskByType.get(MRAMSimulator.MAP_TYPE)
+              .get(taskNumber).time / 3,
+          taskByType.get(MRAMSimulator.MAP_TYPE)
+              .get(taskNumber).time / 3,
+          taskByType.get(MRAMSimulator.MAP_TYPE)
+              .get(taskNumber).time / 3, null);
     default:
       break;
     }
@@ -242,65 +361,7 @@ public class SynthJob implements JobStory {
   }
 
   @Override
-  public org.apache.hadoop.mapred.JobConf getJobConf() {
-    return new JobConf(conf);
-  }
-
-  @Override
-  public String getQueueName() {
-    return queueName;
-  }
-
-  @Override
-  public String toString() {
-    return "SynthJob [\n" + "  workload=" + jobClass.getWorkload().getId()
-        + "\n" + "  jobClass="
-        + jobClass.getWorkload().getClassList().indexOf(jobClass) + "\n"
-        + "  conf=" + conf + ",\n" + "  id=" + id + ",\n" + "  name=" + name
-        + ",\n" + "  mapRuntime=" + Arrays.toString(mapRuntime) + ",\n"
-        + "  reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n"
-        + "  submitTime=" + submitTime + ",\n" + "  numMapTasks=" + numMapTasks
-        + ",\n" + "  numRedTasks=" + numRedTasks + ",\n" + "  mapMaxMemory="
-        + mapMaxMemory + ",\n" + "  reduceMaxMemory=" + reduceMaxMemory + ",\n"
-        + "  queueName=" + queueName + "\n" + "]";
-  }
-
-  public SynthJobClass getJobClass() {
-    return jobClass;
-  }
-
-  public long getTotalSlotTime() {
-    return totMapRuntime + totRedRuntime;
-  }
-
-  public long getDuration() {
-    return duration;
-  }
-
-  public long getDeadline() {
-    return deadline;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof SynthJob)) {
-      return false;
-    }
-    SynthJob o = (SynthJob) other;
-    return Arrays.equals(mapRuntime, o.mapRuntime)
-        && Arrays.equals(reduceRuntime, o.reduceRuntime)
-        && submitTime == o.submitTime && numMapTasks == o.numMapTasks
-        && numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory
-        && reduceMaxMemory == o.reduceMaxMemory
-        && mapMaxVcores == o.mapMaxVcores
-        && reduceMaxVcores == o.reduceMaxVcores && 
queueName.equals(o.queueName)
-        && jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime
-        && totRedRuntime == o.totRedRuntime;
-  }
-
-  @Override
-  public int hashCode() {
-    // could have a bad distr; investigate if a relevant use case exists
-    return jobClass.hashCode() * (int) submitTime;
+  public Values getOutcome() {
+    return Values.SUCCESS;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
deleted file mode 100644
index 439698f..0000000
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.sls.synthetic;
-
-import org.apache.commons.math3.distribution.AbstractRealDistribution;
-import org.apache.commons.math3.distribution.LogNormalDistribution;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.tools.rumen.JobStory;
-import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass;
-import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace;
-
-/**
- * This is a class that represent a class of Jobs. It is used to generate an
- * individual job, by picking random durations, task counts, container size,
- * etc.
- */
-public class SynthJobClass {
-
-  private final JDKRandomGenerator rand;
-  private final LogNormalDistribution dur;
-  private final LogNormalDistribution mapRuntime;
-  private final LogNormalDistribution redRuntime;
-  private final LogNormalDistribution mtasks;
-  private final LogNormalDistribution rtasks;
-  private final LogNormalDistribution mapMem;
-  private final LogNormalDistribution redMem;
-  private final LogNormalDistribution mapVcores;
-  private final LogNormalDistribution redVcores;
-
-  private final Trace trace;
-  @SuppressWarnings("VisibilityModifier")
-  protected final SynthWorkload workload;
-  @SuppressWarnings("VisibilityModifier")
-  protected final JobClass jobClass;
-
-  public SynthJobClass(JDKRandomGenerator rand, Trace trace,
-      SynthWorkload workload, int classId) {
-
-    this.trace = trace;
-    this.workload = workload;
-    this.rand = new JDKRandomGenerator();
-    this.rand.setSeed(rand.nextLong());
-    jobClass = trace.workloads.get(workload.getId()).job_classes.get(classId);
-
-    this.dur = SynthUtils.getLogNormalDist(rand, jobClass.dur_avg,
-        jobClass.dur_stddev);
-    this.mapRuntime = SynthUtils.getLogNormalDist(rand, jobClass.mtime_avg,
-        jobClass.mtime_stddev);
-    this.redRuntime = SynthUtils.getLogNormalDist(rand, jobClass.rtime_avg,
-        jobClass.rtime_stddev);
-    this.mtasks = SynthUtils.getLogNormalDist(rand, jobClass.mtasks_avg,
-        jobClass.mtasks_stddev);
-    this.rtasks = SynthUtils.getLogNormalDist(rand, jobClass.rtasks_avg,
-        jobClass.rtasks_stddev);
-
-    this.mapMem = SynthUtils.getLogNormalDist(rand, 
jobClass.map_max_memory_avg,
-        jobClass.map_max_memory_stddev);
-    this.redMem = SynthUtils.getLogNormalDist(rand,
-        jobClass.reduce_max_memory_avg, jobClass.reduce_max_memory_stddev);
-    this.mapVcores = SynthUtils.getLogNormalDist(rand,
-        jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev);
-    this.redVcores = SynthUtils.getLogNormalDist(rand,
-        jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev);
-  }
-
-  public JobStory getJobStory(Configuration conf, long actualSubmissionTime) {
-    return new SynthJob(rand, conf, this, actualSubmissionTime);
-  }
-
-  @Override
-  public String toString() {
-    return "SynthJobClass [workload=" + workload.getName() + ", class="
-        + jobClass.class_name + " job_count=" + jobClass.class_weight + ", 
dur="
-        + ((dur != null) ? dur.getNumericalMean() : 0) + ", mapRuntime="
-        + ((mapRuntime != null) ? mapRuntime.getNumericalMean() : 0)
-        + ", redRuntime="
-        + ((redRuntime != null) ? redRuntime.getNumericalMean() : 0)
-        + ", mtasks=" + ((mtasks != null) ? mtasks.getNumericalMean() : 0)
-        + ", rtasks=" + ((rtasks != null) ? rtasks.getNumericalMean() : 0)
-        + ", chance_of_reservation=" + jobClass.chance_of_reservation + "]\n";
-
-  }
-
-  public double getClassWeight() {
-    return jobClass.class_weight;
-  }
-
-  public long getDur() {
-    return genLongSample(dur);
-  }
-
-  public int getMtasks() {
-    return genIntSample(mtasks);
-  }
-
-  public int getRtasks() {
-    return genIntSample(rtasks);
-  }
-
-  public long getMapMaxMemory() {
-    return genLongSample(mapMem);
-  }
-
-  public long getReduceMaxMemory() {
-    return genLongSample(redMem);
-  }
-
-  public long getMapMaxVcores() {
-    return genLongSample(mapVcores);
-  }
-
-  public long getReduceMaxVcores() {
-    return genLongSample(redVcores);
-  }
-
-  public SynthWorkload getWorkload() {
-    return workload;
-  }
-
-  public int genIntSample(AbstractRealDistribution dist) {
-    if (dist == null) {
-      return 0;
-    }
-    double baseSample = dist.sample();
-    if (baseSample < 0) {
-      baseSample = 0;
-    }
-    return (int) (Integer.MAX_VALUE & (long) Math.ceil(baseSample));
-  }
-
-  public long genLongSample(AbstractRealDistribution dist) {
-    return dist != null ? (long) Math.ceil(dist.sample()) : 0;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof SynthJobClass)) {
-      return false;
-    }
-    SynthJobClass o = (SynthJobClass) other;
-    return workload.equals(o.workload);
-  }
-
-  @Override
-  public int hashCode() {
-    return workload.hashCode() * workload.getId();
-  }
-
-  public String getClassName() {
-    return jobClass.class_name;
-  }
-
-  public long getMapTimeSample() {
-    return genLongSample(mapRuntime);
-  }
-
-  public long getReduceTimeSample() {
-    return genLongSample(redRuntime);
-  }
-
-  public String getUserName() {
-    return jobClass.user_name;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
index c89e4e2..09bc9b9 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.sls.synthetic;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.distribution.AbstractRealDistribution;
 import org.apache.commons.math3.random.JDKRandomGenerator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -26,7 +27,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator;
+import org.codehaus.jackson.annotate.JsonCreator;
 import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
 
 import javax.xml.bind.annotation.XmlRootElement;
@@ -39,7 +44,7 @@ import static 
org.codehaus.jackson.map.DeserializationConfig.Feature.FAIL_ON_UNK
 
 /**
  * This is a JobStoryProducer that operates from distribution of different
- * workloads. The .json input file is used to determine how many jobs, which
+ * workloads. The .json input file is used to determine how many weight, which
  * size, number of maps/reducers and their duration, as well as the temporal
  * distributed of submissions. For each parameter we control avg and stdev, and
  * generate values via normal or log-normal distributions.
@@ -55,8 +60,6 @@ public class SynthTraceJobProducer implements 
JobStoryProducer {
   private final long seed;
 
   private int totalWeight;
-  private final List<Double> weightList;
-  private final Map<Integer, SynthWorkload> workloads;
 
   private final Queue<StoryParams> listStoryParams;
 
@@ -65,6 +68,9 @@ public class SynthTraceJobProducer implements 
JobStoryProducer {
   public static final String SLS_SYNTHETIC_TRACE_FILE =
       "sls.synthetic" + ".trace_file";
 
+  private final static int DEFAULT_MAPPER_PRIORITY = 20;
+  private final static int DEFAULT_REDUCER_PRIORITY = 10;
+
   public SynthTraceJobProducer(Configuration conf) throws IOException {
     this(conf, new Path(conf.get(SLS_SYNTHETIC_TRACE_FILE)));
   }
@@ -76,8 +82,6 @@ public class SynthTraceJobProducer implements 
JobStoryProducer {
 
     this.conf = conf;
     this.rand = new JDKRandomGenerator();
-    workloads = new HashMap<Integer, SynthWorkload>();
-    weightList = new ArrayList<Double>();
 
     ObjectMapper mapper = new ObjectMapper();
     mapper.configure(INTERN_FIELD_NAMES, true);
@@ -86,44 +90,132 @@ public class SynthTraceJobProducer implements 
JobStoryProducer {
     FileSystem ifs = path.getFileSystem(conf);
     FSDataInputStream fileIn = ifs.open(path);
 
+    // Initialize the random generator and the seed
     this.trace = mapper.readValue(fileIn, Trace.class);
-    seed = trace.rand_seed;
-    rand.setSeed(seed);
+    this.seed = trace.rand_seed;
+    this.rand.setSeed(seed);
+    // Initialize the trace
+    this.trace.init(rand);
 
     this.numJobs = new AtomicInteger(trace.num_jobs);
 
-    for (int workloadId = 0; workloadId < trace.workloads
-        .size(); workloadId++) {
-      SynthWorkload workload = new SynthWorkload(workloadId, trace);
-      for (int classId =
-          0; classId < trace.workloads.get(workloadId).job_classes
-              .size(); classId++) {
-        SynthJobClass cls = new SynthJobClass(rand, trace, workload, classId);
-        workload.add(cls);
-      }
-      workloads.put(workloadId, workload);
+    for (Double w : trace.workload_weights) {
+      totalWeight += w;
     }
 
-    for (int i = 0; i < workloads.size(); i++) {
-      double w = workloads.get(i).getWorkloadWeight();
-      totalWeight += w;
-      weightList.add(w);
+    // Initialize our story parameters
+    listStoryParams = createStory();
+
+    LOG.info("Generated " + listStoryParams.size() + " deadlines for "
+        + this.numJobs.get() + " jobs");
+  }
+
+  // StoryParams hold the minimum amount of information needed to completely
+  // specify a job run: job definition, start time, and queue.
+  // This allows us to create "jobs" and then order them according to start 
time
+  static class StoryParams {
+    // Time the job gets submitted to
+    private long actualSubmissionTime;
+    // The queue the job gets submitted to
+    private String queue;
+    // Definition to construct the job from
+    private JobDefinition jobDef;
+
+    StoryParams(long actualSubmissionTime, String queue, JobDefinition jobDef) 
{
+      this.actualSubmissionTime = actualSubmissionTime;
+      this.queue = queue;
+      this.jobDef = jobDef;
     }
+  }
 
+
+  private Queue<StoryParams> createStory() {
     // create priority queue to keep start-time sorted
-    listStoryParams =
-        new PriorityQueue<StoryParams>(10, new Comparator<StoryParams>() {
+    Queue<StoryParams> storyQueue =
+        new PriorityQueue<>(this.numJobs.get(), new Comparator<StoryParams>() {
           @Override
           public int compare(StoryParams o1, StoryParams o2) {
             return Math
-                .toIntExact(o2.actualSubmissionTime - o1.actualSubmissionTime);
+                .toIntExact(o1.actualSubmissionTime - o2.actualSubmissionTime);
           }
         });
+    for (int i = 0; i < numJobs.get(); i++) {
+      // Generate a workload
+      Workload wl = trace.generateWorkload();
+      // Save all the parameters needed to completely define a job
+      long actualSubmissionTime = wl.generateSubmissionTime();
+      String queue = wl.queue_name;
+      JobDefinition job = wl.generateJobDefinition();
+      storyQueue.add(new StoryParams(actualSubmissionTime, queue, job));
+    }
+    return storyQueue;
+  }
 
-    // initialize it
-    createStoryParams();
-    LOG.info("Generated " + listStoryParams.size() + " deadlines for "
-        + this.numJobs.get() + " jobs ");
+  @Override
+  public JobStory getNextJob() throws IOException {
+    if (numJobs.decrementAndGet() < 0) {
+      return null;
+    }
+    StoryParams storyParams = listStoryParams.poll();
+    return new SynthJob(rand, conf, storyParams.jobDef, storyParams.queue,
+        storyParams.actualSubmissionTime);
+  }
+
+  @Override
+  public void close(){
+  }
+
+  @Override
+  public String toString() {
+    return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs
+        + ", r=" + rand + ", totalWeight="
+        + totalWeight + ", workloads=" + trace.workloads + "]";
+  }
+
+  public int getNumJobs() {
+    return trace.num_jobs;
+  }
+
+  // Helper to parse and maintain backwards compatibility with
+  // syn json formats
+  private static void validateJobDef(JobDefinition jobDef){
+    if(jobDef.tasks == null) {
+      LOG.info("Detected old JobDefinition format. Converting.");
+      try {
+        jobDef.tasks = new ArrayList<>();
+        jobDef.type = "mapreduce";
+        jobDef.deadline_factor = new Sample(jobDef.deadline_factor_avg,
+            jobDef.deadline_factor_stddev);
+        jobDef.duration = new Sample(jobDef.dur_avg,
+            jobDef.dur_stddev);
+        jobDef.reservation = new Sample(jobDef.chance_of_reservation);
+
+        TaskDefinition map = new TaskDefinition();
+        map.type = MRAMSimulator.MAP_TYPE;
+        map.count = new Sample(jobDef.mtasks_avg, jobDef.mtasks_stddev);
+        map.time = new Sample(jobDef.mtime_avg, jobDef.mtime_stddev);
+        map.max_memory = new Sample((double) jobDef.map_max_memory_avg,
+            jobDef.map_max_memory_stddev);
+        map.max_vcores = new Sample((double) jobDef.map_max_vcores_avg,
+            jobDef.map_max_vcores_stddev);
+        map.priority = DEFAULT_MAPPER_PRIORITY;
+
+        jobDef.tasks.add(map);
+        TaskDefinition reduce = new TaskDefinition();
+        reduce.type = MRAMSimulator.REDUCE_TYPE;
+        reduce.count = new Sample(jobDef.rtasks_avg, jobDef.rtasks_stddev);
+        reduce.time = new Sample(jobDef.rtime_avg, jobDef.rtime_stddev);
+        reduce.max_memory = new Sample((double) jobDef.reduce_max_memory_avg,
+            jobDef.reduce_max_memory_stddev);
+        reduce.max_vcores = new Sample((double) jobDef.reduce_max_vcores_avg,
+            jobDef.reduce_max_vcores_stddev);
+        reduce.priority = DEFAULT_REDUCER_PRIORITY;
+
+        jobDef.tasks.add(reduce);
+      } catch (JsonMappingException e) {
+        LOG.warn("Error converting old JobDefinition format", e);
+      }
+    }
   }
 
   public long getSeed() {
@@ -159,6 +251,25 @@ public class SynthTraceJobProducer implements 
JobStoryProducer {
     @JsonProperty("workloads")
     List<Workload> workloads;
 
+    List<Double> workload_weights;
+    JDKRandomGenerator rand;
+
+    public void init(JDKRandomGenerator random){
+      this.rand = random;
+      // Pass rand forward
+      for(Workload w : workloads){
+        w.init(rand);
+      }
+      // Initialize workload weights
+      workload_weights = new ArrayList<>();
+      for(Workload w : workloads){
+        workload_weights.add(w.workload_weight);
+      }
+    }
+
+    Workload generateWorkload(){
+      return workloads.get(SynthUtils.getWeighted(workload_weights, rand));
+    }
   }
 
   /**
@@ -174,16 +285,67 @@ public class SynthTraceJobProducer implements 
JobStoryProducer {
     @JsonProperty("queue_name")
     String queue_name;
     @JsonProperty("job_classes")
-    List<JobClass> job_classes;
+    List<JobDefinition> job_classes;
     @JsonProperty("time_distribution")
     List<TimeSample> time_distribution;
+
+    JDKRandomGenerator rand;
+
+    List<Double> job_weights;
+    List<Double> time_weights;
+
+    public void init(JDKRandomGenerator random){
+      this.rand = random;
+      // Validate and pass rand forward
+      for(JobDefinition def : job_classes){
+        validateJobDef(def);
+        def.init(rand);
+      }
+
+      // Initialize job weights
+      job_weights = new ArrayList<>();
+      job_weights = new ArrayList<>();
+      for(JobDefinition j : job_classes){
+        job_weights.add(j.class_weight);
+      }
+
+      // Initialize time weights
+      time_weights = new ArrayList<>();
+      for(TimeSample ts : time_distribution){
+        time_weights.add(ts.weight);
+      }
+    }
+
+    public long generateSubmissionTime(){
+      int index = SynthUtils.getWeighted(time_weights, rand);
+      // Retrieve the lower and upper bounds for this time "bucket"
+      int start = time_distribution.get(index).time;
+      // Get the beginning of the next time sample (if it exists)
+      index = (index+1)<time_distribution.size() ? index+1 : index;
+      int end = time_distribution.get(index).time;
+      int range = end-start;
+      // Within this time "bucket", uniformly pick a time if our
+      // range is non-zero, otherwise just use the start time of the bucket
+      return start + (range>0 ? rand.nextInt(range) : 0);
+    }
+
+    public JobDefinition generateJobDefinition(){
+      return job_classes.get(SynthUtils.getWeighted(job_weights, rand));
+    }
+
+    @Override
+    public String toString(){
+      return "\nWorkload " + workload_name + ", weight: " + workload_weight
+          + ", queue: " + queue_name + " "
+          + job_classes.toString().replace("\n", "\n\t");
+    }
   }
 
   /**
    * Class used to parse a job class from file.
    */
   @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
-  public static class JobClass {
+  public static class JobDefinition {
 
     @JsonProperty("class_name")
     String class_name;
@@ -194,6 +356,23 @@ public class SynthTraceJobProducer implements 
JobStoryProducer {
     @JsonProperty("class_weight")
     double class_weight;
 
+    // am type to launch
+    @JsonProperty("type")
+    String type;
+    @JsonProperty("deadline_factor")
+    Sample deadline_factor;
+    @JsonProperty("duration")
+    Sample duration;
+    @JsonProperty("reservation")
+    Sample reservation;
+
+    @JsonProperty("tasks")
+    List<TaskDefinition> tasks;
+
+    @JsonProperty("params")
+    Map<String, String> params;
+
+    // Old JSON fields for backwards compatibility
     // reservation related params
     @JsonProperty("chance_of_reservation")
     double chance_of_reservation;
@@ -246,71 +425,227 @@ public class SynthTraceJobProducer implements 
JobStoryProducer {
     @JsonProperty("reduce_max_vcores_stddev")
     double reduce_max_vcores_stddev;
 
+    public void init(JDKRandomGenerator rand){
+      deadline_factor.init(rand);
+      duration.init(rand);
+      reservation.init(rand);
+
+      for(TaskDefinition t : tasks){
+        t.count.init(rand);
+        t.time.init(rand);
+        t.max_memory.init(rand);
+        t.max_vcores.init(rand);
+      }
+    }
+
+    @Override
+    public String toString(){
+      return "\nJobDefinition " + class_name + ", weight: " + class_weight
+          + ", type: " + type + " "
+          + tasks.toString().replace("\n", "\n\t");
+    }
   }
 
   /**
-   * This is used to define time-varying probability of a job start-time (e.g.,
-   * to simulate daily patterns).
+   * A task representing a type of container - e.g. "map" in mapreduce
    */
   @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
-  public static class TimeSample {
-    // in sec
+  public static class TaskDefinition {
+
+    @JsonProperty("type")
+    String type;
+    @JsonProperty("count")
+    Sample count;
     @JsonProperty("time")
-    int time;
-    @JsonProperty("weight")
-    double jobs;
+    Sample time;
+    @JsonProperty("max_memory")
+    Sample max_memory;
+    @JsonProperty("max_vcores")
+    Sample max_vcores;
+    @JsonProperty("priority")
+    int priority;
+
+    @Override
+    public String toString(){
+      return "\nTaskDefinition " + type
+          + " Count[" + count + "] Time[" + time + "] Memory[" + max_memory
+          + "] Vcores[" + max_vcores + "] Priority[" + priority + "]";
+    }
   }
 
-  static class StoryParams {
-    private SynthJobClass pickedJobClass;
-    private long actualSubmissionTime;
+  /**
+   * Class used to parse value sample information.
+   */
+  @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
+  public static class Sample {
+    private static final Dist DEFAULT_DIST = Dist.LOGNORM;
+
+    private final double val;
+    private final double std;
+    private final Dist dist;
+    private AbstractRealDistribution dist_instance;
+    private final List<String> discrete;
+    private final List<Double> weights;
+    private final Mode mode;
+
+    private JDKRandomGenerator rand;
+
+    private enum Mode{
+      CONST,
+      DIST,
+      DISC
+    }
 
-    StoryParams(SynthJobClass pickedJobClass, long actualSubmissionTime) {
-      this.pickedJobClass = pickedJobClass;
-      this.actualSubmissionTime = actualSubmissionTime;
+    private enum Dist{
+      LOGNORM,
+      NORM
     }
-  }
 
+    public Sample(Double val) throws JsonMappingException{
+      this(val, null);
+    }
 
-  void createStoryParams() {
+    public Sample(Double val, Double std) throws JsonMappingException{
+      this(val, std, null, null, null);
+    }
 
-    for (int i = 0; i < numJobs.get(); i++) {
-      int workload = SynthUtils.getWeighted(weightList, rand);
-      SynthWorkload pickedWorkload = workloads.get(workload);
-      long jobClass =
-          SynthUtils.getWeighted(pickedWorkload.getWeightList(), rand);
-      SynthJobClass pickedJobClass =
-          pickedWorkload.getClassList().get((int) jobClass);
-      long actualSubmissionTime = pickedWorkload.getBaseSubmissionTime(rand);
-      // long actualSubmissionTime = (i + 1) * 10;
-      listStoryParams
-          .add(new StoryParams(pickedJobClass, actualSubmissionTime));
+    @JsonCreator
+    public Sample(@JsonProperty("val") Double val,
+        @JsonProperty("std") Double std, @JsonProperty("dist") String dist,
+        @JsonProperty("discrete") List<String> discrete,
+        @JsonProperty("weights") List<Double> weights)
+        throws JsonMappingException{
+      // Different Modes
+      // - Constant: val must be specified, all else null. Sampling will
+      // return val.
+      // - Distribution: val, std specified, dist optional (defaults to
+      // LogNormal). Sampling will sample from the appropriate distribution
+      // - Discrete: discrete must be set to a list of strings or numbers,
+      // weights optional (defaults to uniform)
+
+      if(val!=null){
+        if(std==null){
+          // Constant
+          if(dist!=null || discrete!=null || weights!=null){
+            throw new JsonMappingException("Instantiation of " + Sample.class
+                + " failed");
+          }
+          mode = Mode.CONST;
+          this.val = val;
+          this.std = 0;
+          this.dist = null;
+          this.discrete = null;
+          this.weights = null;
+        } else {
+          // Distribution
+          if(discrete!=null || weights != null){
+            throw new JsonMappingException("Instantiation of " + Sample.class
+                + " failed");
+          }
+          mode = Mode.DIST;
+          this.val = val;
+          this.std = std;
+          this.dist = dist!=null ? Dist.valueOf(dist) : DEFAULT_DIST;
+          this.discrete = null;
+          this.weights = null;
+        }
+      } else {
+        // Discrete
+        if(discrete==null){
+          throw new JsonMappingException("Instantiation of " + Sample.class
+              + " failed");
+        }
+        mode = Mode.DISC;
+        this.val = 0;
+        this.std = 0;
+        this.dist = null;
+        this.discrete = discrete;
+        if(weights == null){
+          weights = new ArrayList<>(Collections.nCopies(
+              discrete.size(), 1.0));
+        }
+        if(weights.size() != discrete.size()){
+          throw new JsonMappingException("Instantiation of " + Sample.class
+              + " failed");
+        }
+        this.weights = weights;
+      }
     }
-  }
 
-  @Override
-  public JobStory getNextJob() throws IOException {
-    if (numJobs.decrementAndGet() < 0) {
-      return null;
+    public void init(JDKRandomGenerator random){
+      if(this.rand != null){
+        throw new YarnRuntimeException("init called twice");
+      }
+      this.rand = random;
+      if(mode == Mode.DIST){
+        switch(this.dist){
+        case LOGNORM:
+          this.dist_instance = SynthUtils.getLogNormalDist(rand, val, std);
+          return;
+        case NORM:
+          this.dist_instance = SynthUtils.getNormalDist(rand, val, std);
+          return;
+        default:
+          throw new YarnRuntimeException("Unknown distribution " + 
dist.name());
+        }
+      }
     }
-    StoryParams storyParams = listStoryParams.poll();
-    return storyParams.pickedJobClass.getJobStory(conf,
-        storyParams.actualSubmissionTime);
-  }
 
-  @Override
-  public void close() {
-  }
+    public int getInt(){
+      return Math.toIntExact(getLong());
+    }
 
-  @Override
-  public String toString() {
-    return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs
-        + ", weightList=" + weightList + ", r=" + rand + ", totalWeight="
-        + totalWeight + ", workloads=" + workloads + "]";
-  }
+    public long getLong(){
+      return Math.round(getDouble());
+    }
+
+    public double getDouble(){
+      return Double.parseDouble(getString());
+    }
+
+    public String getString(){
+      if(this.rand == null){
+        throw new YarnRuntimeException("getValue called without init");
+      }
+      switch(mode){
+      case CONST:
+        return Double.toString(val);
+      case DIST:
+        return Double.toString(dist_instance.sample());
+      case DISC:
+        return this.discrete.get(SynthUtils.getWeighted(this.weights, rand));
+      default:
+        throw new YarnRuntimeException("Unknown sampling mode " + mode.name());
+      }
+    }
+
+    @Override
+    public String toString(){
+      switch(mode){
+      case CONST:
+        return "value: " + Double.toString(val);
+      case DIST:
+        return "value: " + this.val + " std: " + this.std + " dist: "
+            + this.dist.name();
+      case DISC:
+        return "discrete: " + this.discrete + ", weights: " + this.weights;
+      default:
+        throw new YarnRuntimeException("Unknown sampling mode " + mode.name());
+      }
+    }
 
-  public int getNumJobs() {
-    return trace.num_jobs;
   }
 
+  /**
+   * This is used to define time-varying probability of a job start-time (e.g.,
+   * to simulate daily patterns).
+   */
+  @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
+  public static class TimeSample {
+    // in sec
+    @JsonProperty("time")
+    int time;
+    @JsonProperty("weight")
+    double weight;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java
deleted file mode 100644
index 9e5fd4e..0000000
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.sls.synthetic;
-
-import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace;
-
-import java.util.*;
-
-/**
- * This class represent a workload (made up of multiple SynthJobClass(es)). It
- * also stores the temporal distributions of jobs in this workload.
- */
-public class SynthWorkload {
-
-  private final int id;
-  private final List<SynthJobClass> classList;
-  private final Trace trace;
-  private final SortedMap<Integer, Double> timeWeights;
-
-  public SynthWorkload(int identifier, Trace inTrace) {
-    classList = new ArrayList<SynthJobClass>();
-    this.id = identifier;
-    this.trace = inTrace;
-    timeWeights = new TreeMap<Integer, Double>();
-    for (SynthTraceJobProducer.TimeSample ts : trace.workloads
-        .get(id).time_distribution) {
-      timeWeights.put(ts.time, ts.jobs);
-    }
-  }
-
-  public boolean add(SynthJobClass s) {
-    return classList.add(s);
-  }
-
-  public List<Double> getWeightList() {
-    ArrayList<Double> ret = new ArrayList<Double>();
-    for (SynthJobClass s : classList) {
-      ret.add(s.getClassWeight());
-    }
-    return ret;
-  }
-
-  public int getId() {
-    return id;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof SynthWorkload)) {
-      return false;
-    }
-    // assume ID determines job classes by construction
-    return getId() == ((SynthWorkload) other).getId();
-  }
-
-  @Override
-  public int hashCode() {
-    return getId();
-  }
-
-  @Override
-  public String toString() {
-    return "SynthWorkload " + trace.workloads.get(id).workload_name + "[\n"
-        + classList + "]\n";
-  }
-
-  public String getName() {
-    return trace.workloads.get(id).workload_name;
-  }
-
-  public double getWorkloadWeight() {
-    return trace.workloads.get(id).workload_weight;
-  }
-
-  public String getQueueName() {
-    return trace.workloads.get(id).queue_name;
-  }
-
-  public long getBaseSubmissionTime(Random rand) {
-
-    // pick based on weights the "bucket" for this start time
-    int position = SynthUtils.getWeighted(timeWeights.values(), rand);
-
-    int[] time = new int[timeWeights.keySet().size()];
-    int index = 0;
-    for (Integer i : timeWeights.keySet()) {
-      time[index++] = i;
-    }
-
-    // uniformly pick a time between start and end time of this bucket
-    int startRange = time[position];
-    int endRange = startRange;
-    // if there is no subsequent bucket pick startRange
-    if (position < timeWeights.keySet().size() - 1) {
-      endRange = time[position + 1];
-      return startRange + rand.nextInt((endRange - startRange));
-    } else {
-      return startRange;
-    }
-  }
-
-  public List<SynthJobClass> getClassList() {
-    return classList;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java
 
b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java
index 6b369f2..668be14 100644
--- 
a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java
+++ 
b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/BaseSLSRunnerTest.java
@@ -125,7 +125,7 @@ public abstract class BaseSLSRunnerTest {
       if (!exceptionList.isEmpty()) {
         sls.stop();
         Assert.fail("TestSLSRunner catched exception from child thread "
-            + "(TaskRunner.Task): " + exceptionList);
+            + "(TaskRunner.TaskDefinition): " + exceptionList);
         break;
       }
       timeout--;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java
 
b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java
new file mode 100644
index 0000000..79ebe21
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSGenericSynth.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * This test performs simple runs of the SLS with the generic syn json format.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+public class TestSLSGenericSynth extends BaseSLSRunnerTest {
+
+  @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})")
+  public static Collection<Object[]> data() {
+
+    String capScheduler = CapacityScheduler.class.getCanonicalName();
+    String fairScheduler = FairScheduler.class.getCanonicalName();
+    String synthTraceFile = "src/test/resources/syn_generic.json";
+    String nodeFile = "src/test/resources/nodes.json";
+
+    // Test with both schedulers
+    return Arrays.asList(new Object[][] {
+
+        // covering the no nodeFile case
+        {capScheduler, "SYNTH", synthTraceFile, null },
+
+        // covering new commandline and CapacityScheduler
+        {capScheduler, "SYNTH", synthTraceFile, nodeFile },
+
+        // covering FairScheduler
+        {fairScheduler, "SYNTH", synthTraceFile, nodeFile },
+    });
+  }
+
+  @Before
+  public void setup() {
+    ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt";
+    exitInvariantFile = "src/test/resources/exit-invariants.txt";
+  }
+
+  @Test(timeout = 90000)
+  @SuppressWarnings("all")
+  public void testSimulatorRunning() throws Exception {
+    Configuration conf = new Configuration(false);
+    long timeTillShutdownInsec = 20L;
+    runSLS(conf, timeTillShutdownInsec);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84cea001/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java
 
b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java
new file mode 100644
index 0000000..a5d30e0
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/TestSLSStreamAMSynth.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.sls;
+
+import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * This test performs simple runs of the SLS with the generic syn json format.
+ */
+@RunWith(value = Parameterized.class)
+@NotThreadSafe
+public class TestSLSStreamAMSynth extends BaseSLSRunnerTest {
+
+  @Parameters(name = "Testing with: {1}, {0}, (nodeFile {3})")
+  public static Collection<Object[]> data() {
+
+    String capScheduler = CapacityScheduler.class.getCanonicalName();
+    String fairScheduler = FairScheduler.class.getCanonicalName();
+    String synthTraceFile = "src/test/resources/syn_stream.json";
+    String nodeFile = "src/test/resources/nodes.json";
+
+    // Test with both schedulers
+    return Arrays.asList(new Object[][] {
+
+        // covering the no nodeFile case
+        {capScheduler, "SYNTH", synthTraceFile, null },
+
+        // covering new commandline and CapacityScheduler
+        {capScheduler, "SYNTH", synthTraceFile, nodeFile },
+
+        // covering FairScheduler
+        {fairScheduler, "SYNTH", synthTraceFile, nodeFile },
+    });
+  }
+
+  @Before
+  public void setup() {
+    ongoingInvariantFile = "src/test/resources/ongoing-invariants.txt";
+    exitInvariantFile = "src/test/resources/exit-invariants.txt";
+  }
+
+  @Test(timeout = 90000)
+  @SuppressWarnings("all")
+  public void testSimulatorRunning() throws Exception {
+    Configuration conf = new Configuration(false);
+    long timeTillShutdownInsec = 20L;
+    runSLS(conf, timeTillShutdownInsec);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to