http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
new file mode 100644
index 0000000..81f6648
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.sls.SLSRunner;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Private
+@Unstable
+public class SLSFairScheduler extends FairScheduler
+    implements SchedulerWrapper, Configurable {
+  private SchedulerMetrics schedulerMetrics;
+  private boolean metricsON;
+  private Tracker tracker;
+
+  private Map<ContainerId, Resource> preemptionContainerMap =
+      new ConcurrentHashMap<>();
+
+  public SchedulerMetrics getSchedulerMetrics() {
+    return schedulerMetrics;
+  }
+
+  public Tracker getTracker() {
+    return tracker;
+  }
+
+  public SLSFairScheduler() {
+    tracker = new Tracker();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConfig(conf);
+
+    metricsON = conf.getBoolean(SLSConfiguration.METRICS_SWITCH, true);
+    if (metricsON) {
+      try {
+        schedulerMetrics = SchedulerMetrics.getInstance(conf,
+            FairScheduler.class);
+        schedulerMetrics.init(this, conf);
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  @Override
+  public Allocation allocate(ApplicationAttemptId attemptId,
+      List<ResourceRequest> resourceRequests, List<ContainerId> containerIds,
+      List<String> blacklistAdditions, List<String> blacklistRemovals,
+      ContainerUpdates updateRequests) {
+    if (metricsON) {
+      final Timer.Context context = 
schedulerMetrics.getSchedulerAllocateTimer()
+          .time();
+      Allocation allocation = null;
+      try {
+        allocation = super.allocate(attemptId, resourceRequests, containerIds,
+            blacklistAdditions, blacklistRemovals, updateRequests);
+        return allocation;
+      } finally {
+        context.stop();
+        schedulerMetrics.increaseSchedulerAllocationCounter();
+        try {
+          updateQueueWithAllocateRequest(allocation, attemptId,
+              resourceRequests, containerIds);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      }
+    } else {
+      return super.allocate(attemptId, resourceRequests, containerIds,
+          blacklistAdditions, blacklistRemovals, updateRequests);
+    }
+  }
+
+  @Override
+  public void handle(SchedulerEvent schedulerEvent) {
+    // metrics off
+    if (!metricsON) {
+      super.handle(schedulerEvent);
+      return;
+    }
+
+    // metrics on
+    if(!schedulerMetrics.isRunning()) {
+      schedulerMetrics.setRunning(true);
+    }
+
+    Timer.Context handlerTimer = null;
+    Timer.Context operationTimer = null;
+
+    NodeUpdateSchedulerEventWrapper eventWrapper;
+    try {
+      if (schedulerEvent.getType() == SchedulerEventType.NODE_UPDATE
+          && schedulerEvent instanceof NodeUpdateSchedulerEvent) {
+        eventWrapper = new NodeUpdateSchedulerEventWrapper(
+            (NodeUpdateSchedulerEvent)schedulerEvent);
+        schedulerEvent = eventWrapper;
+        updateQueueWithNodeUpdate(eventWrapper);
+      } else if (
+          schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
+          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+        // check if having AM Container, update resource usage information
+        AppAttemptRemovedSchedulerEvent appRemoveEvent =
+            (AppAttemptRemovedSchedulerEvent) schedulerEvent;
+        ApplicationAttemptId appAttemptId =
+            appRemoveEvent.getApplicationAttemptID();
+        String queueName = getSchedulerApp(appAttemptId).getQueue().getName();
+        SchedulerAppReport app = getSchedulerAppInfo(appAttemptId);
+        if (!app.getLiveContainers().isEmpty()) {  // have 0 or 1
+          // should have one container which is AM container
+          RMContainer rmc = app.getLiveContainers().iterator().next();
+          schedulerMetrics.updateQueueMetricsByRelease(
+              rmc.getContainer().getResource(), queueName);
+        }
+      }
+
+      handlerTimer = schedulerMetrics.getSchedulerHandleTimer().time();
+      operationTimer = schedulerMetrics.getSchedulerHandleTimer(
+          schedulerEvent.getType()).time();
+
+      super.handle(schedulerEvent);
+    } finally {
+      if (handlerTimer != null) {
+        handlerTimer.stop();
+      }
+      if (operationTimer != null) {
+        operationTimer.stop();
+      }
+      
schedulerMetrics.increaseSchedulerHandleCounter(schedulerEvent.getType());
+
+      if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
+          && schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
+        SLSRunner.decreaseRemainingApps();
+      }
+    }
+  }
+
+  private void updateQueueWithNodeUpdate(
+      NodeUpdateSchedulerEventWrapper eventWrapper) {
+    RMNodeWrapper node = (RMNodeWrapper) eventWrapper.getRMNode();
+    List<UpdatedContainerInfo> containerList = node.getContainerUpdates();
+    for (UpdatedContainerInfo info : containerList) {
+      for (ContainerStatus status : info.getCompletedContainers()) {
+        ContainerId containerId = status.getContainerId();
+        SchedulerAppReport app = super.getSchedulerAppInfo(
+            containerId.getApplicationAttemptId());
+
+        if (app == null) {
+          // this happens for the AM container
+          // The app have already removed when the NM sends the release
+          // information.
+          continue;
+        }
+
+        int releasedMemory = 0, releasedVCores = 0;
+        if (status.getExitStatus() == ContainerExitStatus.SUCCESS) {
+          for (RMContainer rmc : app.getLiveContainers()) {
+            if (rmc.getContainerId() == containerId) {
+              Resource resource = rmc.getContainer().getResource();
+              releasedMemory += resource.getMemorySize();
+              releasedVCores += resource.getVirtualCores();
+              break;
+            }
+          }
+        } else if (status.getExitStatus() == ContainerExitStatus.ABORTED) {
+          if (preemptionContainerMap.containsKey(containerId)) {
+            Resource preResource = preemptionContainerMap.get(containerId);
+            releasedMemory += preResource.getMemorySize();
+            releasedVCores += preResource.getVirtualCores();
+            preemptionContainerMap.remove(containerId);
+          }
+        }
+        // update queue counters
+        String queue = getSchedulerApp(containerId.getApplicationAttemptId()).
+            getQueueName();
+        schedulerMetrics.updateQueueMetricsByRelease(
+            Resource.newInstance(releasedMemory, releasedVCores), queue);
+      }
+    }
+  }
+
+  private void updateQueueWithAllocateRequest(Allocation allocation,
+      ApplicationAttemptId attemptId,
+      List<ResourceRequest> resourceRequests,
+      List<ContainerId> containerIds) throws IOException {
+    // update queue information
+    Resource pendingResource = Resources.createResource(0, 0);
+    Resource allocatedResource = Resources.createResource(0, 0);
+    // container requested
+    for (ResourceRequest request : resourceRequests) {
+      if (request.getResourceName().equals(ResourceRequest.ANY)) {
+        Resources.addTo(pendingResource,
+            Resources.multiply(request.getCapability(),
+                request.getNumContainers()));
+      }
+    }
+    // container allocated
+    for (Container container : allocation.getContainers()) {
+      Resources.addTo(allocatedResource, container.getResource());
+      Resources.subtractFrom(pendingResource, container.getResource());
+    }
+    // container released from AM
+    SchedulerAppReport report = super.getSchedulerAppInfo(attemptId);
+    for (ContainerId containerId : containerIds) {
+      Container container = null;
+      for (RMContainer c : report.getLiveContainers()) {
+        if (c.getContainerId().equals(containerId)) {
+          container = c.getContainer();
+          break;
+        }
+      }
+      if (container != null) {
+        // released allocated containers
+        Resources.subtractFrom(allocatedResource, container.getResource());
+      } else {
+        for (RMContainer c : report.getReservedContainers()) {
+          if (c.getContainerId().equals(containerId)) {
+            container = c.getContainer();
+            break;
+          }
+        }
+        if (container != null) {
+          // released reserved containers
+          Resources.subtractFrom(pendingResource, container.getResource());
+        }
+      }
+    }
+    // containers released/preemption from scheduler
+    Set<ContainerId> preemptionContainers = new HashSet<ContainerId>();
+    if (allocation.getContainerPreemptions() != null) {
+      preemptionContainers.addAll(allocation.getContainerPreemptions());
+    }
+    if (allocation.getStrictContainerPreemptions() != null) {
+      preemptionContainers.addAll(allocation.getStrictContainerPreemptions());
+    }
+    if (!preemptionContainers.isEmpty()) {
+      for (ContainerId containerId : preemptionContainers) {
+        if (!preemptionContainerMap.containsKey(containerId)) {
+          Container container = null;
+          for (RMContainer c : report.getLiveContainers()) {
+            if (c.getContainerId().equals(containerId)) {
+              container = c.getContainer();
+              break;
+            }
+          }
+          if (container != null) {
+            preemptionContainerMap.put(containerId, container.getResource());
+          }
+        }
+
+      }
+    }
+
+    // update metrics
+    String queueName = getSchedulerApp(attemptId).getQueueName();
+    schedulerMetrics.updateQueueMetrics(pendingResource, allocatedResource,
+        queueName);
+  }
+
+  private void initQueueMetrics(FSQueue queue) {
+    if (queue instanceof FSLeafQueue) {
+      schedulerMetrics.initQueueMetric(queue.getQueueName());
+      return;
+    }
+
+    for (FSQueue child : queue.getChildQueues()) {
+      initQueueMetrics(child);
+    }
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    if (metricsON) {
+      initQueueMetrics(getQueueManager().getRootQueue());
+    }
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    try {
+      schedulerMetrics.tearDown();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    super.serviceStop();
+  }
+
+  public String getRealQueueName(String queue) throws YarnException {
+    if (!getQueueManager().exists(queue)) {
+      throw new YarnException("Can't find the queue by the given name: " + 
queue
+          + "! Please check if queue " + queue + " is in the allocation 
file.");
+    }
+    return getQueueManager().getQueue(queue).getQueueName();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java
index ecf516d..3c7aa77 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerMetrics.java
@@ -18,66 +18,218 @@
 
 package org.apache.hadoop.yarn.sls.scheduler;
 
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.Locale;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Lock;
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
-        .ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
-        .SchedulerAppReport;
-
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.CsvReporter;
 import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.web.SLSWebApp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Private
 @Unstable
 public abstract class SchedulerMetrics {
+  private static final String EOL = System.getProperty("line.separator");
+  private static final int SAMPLING_SIZE = 60;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SchedulerMetrics.class);
+
   protected ResourceScheduler scheduler;
   protected Set<String> trackedQueues;
   protected MetricRegistry metrics;
   protected Set<String> appTrackedMetrics;
   protected Set<String> queueTrackedMetrics;
-  
+
+  private Configuration conf;
+  private ScheduledExecutorService pool;
+  private SLSWebApp web;
+
+  // metrics
+  private String metricsOutputDir;
+  private BufferedWriter metricsLogBW;
+  private BufferedWriter jobRuntimeLogBW;
+  private boolean running = false;
+
+  // counters for scheduler allocate/handle operations
+  private Counter schedulerAllocateCounter;
+  private Counter schedulerHandleCounter;
+  private Map<SchedulerEventType, Counter> schedulerHandleCounterMap;
+
+  // Timers for scheduler allocate/handle operations
+  private Timer schedulerAllocateTimer;
+  private Timer schedulerHandleTimer;
+  private Map<SchedulerEventType, Timer> schedulerHandleTimerMap;
+  private List<Histogram> schedulerHistogramList;
+  private Map<Histogram, Timer> histogramTimerMap;
+  private Lock samplerLock;
+  private Lock queueLock;
+
+  static Class getSchedulerMetricsClass(Configuration conf,
+      Class schedulerClass) throws ClassNotFoundException {
+    Class metricClass = null;
+    String schedulerMetricsType = conf.get(schedulerClass.getName());
+    if (schedulerMetricsType != null) {
+      metricClass = Class.forName(schedulerMetricsType);
+    }
+
+    if (schedulerClass.equals(FairScheduler.class)) {
+      metricClass = FairSchedulerMetrics.class;
+    } else if (schedulerClass.equals(CapacityScheduler.class)) {
+      metricClass = CapacitySchedulerMetrics.class;
+    } else if (schedulerClass.equals(FifoScheduler.class)) {
+      metricClass = FifoSchedulerMetrics.class;
+    }
+
+    return metricClass;
+  }
+
+  @SuppressWarnings("unchecked")
+  static SchedulerMetrics getInstance(Configuration conf, Class schedulerClass)
+      throws ClassNotFoundException {
+    Class schedulerMetricClass = getSchedulerMetricsClass(conf, 
schedulerClass);
+    return (SchedulerMetrics) ReflectionUtils
+        .newInstance(schedulerMetricClass, new Configuration());
+  }
+
   public SchedulerMetrics() {
-    appTrackedMetrics = new HashSet<String>();
+    metrics = new MetricRegistry();
+
+    appTrackedMetrics = new HashSet<>();
     appTrackedMetrics.add("live.containers");
     appTrackedMetrics.add("reserved.containers");
-    queueTrackedMetrics = new HashSet<String>();
+
+    queueTrackedMetrics = new HashSet<>();
+    trackedQueues = new HashSet<>();
+
+    samplerLock = new ReentrantLock();
+    queueLock = new ReentrantLock();
   }
-  
-  public void init(ResourceScheduler scheduler, MetricRegistry metrics) {
-    this.scheduler = scheduler;
-    this.trackedQueues = new HashSet<String>();
-    this.metrics = metrics;
+
+  void init(ResourceScheduler resourceScheduler, Configuration config)
+      throws Exception {
+    this.scheduler = resourceScheduler;
+    this.conf = config;
+
+    metricsOutputDir = conf.get(SLSConfiguration.METRICS_OUTPUT_DIR);
+
+    // register various metrics
+    registerJvmMetrics();
+    registerClusterResourceMetrics();
+    registerContainerAppNumMetrics();
+    registerSchedulerMetrics();
+
+    // .csv output
+    initMetricsCSVOutput();
+
+    // start web app to provide real-time tracking
+    int metricsWebAddressPort = conf.getInt(
+        SLSConfiguration.METRICS_WEB_ADDRESS_PORT,
+        SLSConfiguration.METRICS_WEB_ADDRESS_PORT_DEFAULT);
+    web = new SLSWebApp((SchedulerWrapper)scheduler, metricsWebAddressPort);
+    web.start();
+
+    // a thread to update histogram timer
+    pool = new ScheduledThreadPoolExecutor(2);
+    pool.scheduleAtFixedRate(new HistogramsRunnable(), 0, 1000,
+        TimeUnit.MILLISECONDS);
+
+    // a thread to output metrics for real-tiem tracking
+    pool.scheduleAtFixedRate(new MetricsLogRunnable(), 0, 1000,
+        TimeUnit.MILLISECONDS);
+
+    // application running information
+    jobRuntimeLogBW =
+        new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
+            metricsOutputDir + "/jobruntime.csv"), "UTF-8"));
+    jobRuntimeLogBW.write("JobID,real_start_time,real_end_time," +
+        "simulate_start_time,simulate_end_time" + EOL);
+    jobRuntimeLogBW.flush();
   }
-  
-  public void trackApp(final ApplicationAttemptId appAttemptId,
-                       String oldAppId) {
+
+  public MetricRegistry getMetrics() {
+    return metrics;
+  }
+
+  protected SchedulerApplicationAttempt getSchedulerAppAttempt(
+      ApplicationId appId) {
+    AbstractYarnScheduler yarnScheduler = (AbstractYarnScheduler)scheduler;
+    SchedulerApplication app = (SchedulerApplication)yarnScheduler
+        .getSchedulerApplications().get(appId);
+    if (app == null) {
+      return null;
+    }
+    return app.getCurrentAppAttempt();
+  }
+
+  public void trackApp(final ApplicationId appId, String oldAppId) {
     metrics.register("variable.app." + oldAppId + ".live.containers",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId);
-          return app.getLiveContainers().size();
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            SchedulerApplicationAttempt appAttempt =
+                getSchedulerAppAttempt(appId);
+            if (appAttempt != null) {
+              return appAttempt.getLiveContainers().size();
+            } else {
+              return 0;
+            }
+          }
         }
-      }
     );
     metrics.register("variable.app." + oldAppId + ".reserved.containers",
-      new Gauge<Integer>() {
-        @Override
-        public Integer getValue() {
-          SchedulerAppReport app = scheduler.getSchedulerAppInfo(appAttemptId);
-          return app.getReservedContainers().size();
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            SchedulerApplicationAttempt appAttempt =
+                getSchedulerAppAttempt(appId);
+            if (appAttempt != null) {
+              return appAttempt.getReservedContainers().size();
+            } else {
+              return 0;
+            }
+          }
         }
-      }
     );
   }
-  
-  public void untrackApp(ApplicationAttemptId appAttemptId,
-      String oldAppId) {
+
+  public void untrackApp(String oldAppId) {
     for (String m : appTrackedMetrics) {
       metrics.remove("variable.app." + oldAppId + "." + m);
     }
@@ -98,7 +250,392 @@ public abstract class SchedulerMetrics {
   public Set<String> getAppTrackedMetrics() {
     return appTrackedMetrics;
   }
+
   public Set<String> getQueueTrackedMetrics() {
     return queueTrackedMetrics;
   }
+
+  private void registerJvmMetrics() {
+    // add JVM gauges
+    metrics.register("variable.jvm.free.memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            return Runtime.getRuntime().freeMemory();
+          }
+        }
+    );
+    metrics.register("variable.jvm.max.memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            return Runtime.getRuntime().maxMemory();
+          }
+        }
+    );
+    metrics.register("variable.jvm.total.memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            return Runtime.getRuntime().totalMemory();
+          }
+        }
+    );
+  }
+
+  private void registerClusterResourceMetrics() {
+    metrics.register("variable.cluster.allocated.memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            if (scheduler.getRootQueueMetrics() == null) {
+              return 0L;
+            } else {
+              return scheduler.getRootQueueMetrics().getAllocatedMB();
+            }
+          }
+        }
+    );
+    metrics.register("variable.cluster.allocated.vcores",
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            if (scheduler.getRootQueueMetrics() == null) {
+              return 0;
+            } else {
+              return 
scheduler.getRootQueueMetrics().getAllocatedVirtualCores();
+            }
+          }
+        }
+    );
+    metrics.register("variable.cluster.available.memory",
+        new Gauge<Long>() {
+          @Override
+          public Long getValue() {
+            if (scheduler.getRootQueueMetrics() == null) {
+              return 0L;
+            } else {
+              return scheduler.getRootQueueMetrics().getAvailableMB();
+            }
+          }
+        }
+    );
+    metrics.register("variable.cluster.available.vcores",
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            if (scheduler.getRootQueueMetrics() == null) {
+              return 0;
+            } else {
+              return 
scheduler.getRootQueueMetrics().getAvailableVirtualCores();
+            }
+          }
+        }
+    );
+  }
+
+  private void registerContainerAppNumMetrics() {
+    metrics.register("variable.running.application",
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            if (scheduler.getRootQueueMetrics() == null) {
+              return 0;
+            } else {
+              return scheduler.getRootQueueMetrics().getAppsRunning();
+            }
+          }
+        }
+    );
+    metrics.register("variable.running.container",
+        new Gauge<Integer>() {
+          @Override
+          public Integer getValue() {
+            if (scheduler.getRootQueueMetrics() == null) {
+              return 0;
+            } else {
+              return scheduler.getRootQueueMetrics().getAllocatedContainers();
+            }
+          }
+        }
+    );
+  }
+
+  private void registerSchedulerMetrics() {
+    samplerLock.lock();
+    try {
+      // counters for scheduler operations
+      schedulerAllocateCounter = metrics.counter(
+          "counter.scheduler.operation.allocate");
+      schedulerHandleCounter = metrics.counter(
+          "counter.scheduler.operation.handle");
+      schedulerHandleCounterMap = new HashMap<>();
+      for (SchedulerEventType e : SchedulerEventType.values()) {
+        Counter counter = metrics.counter(
+            "counter.scheduler.operation.handle." + e);
+        schedulerHandleCounterMap.put(e, counter);
+      }
+      // timers for scheduler operations
+      int timeWindowSize = conf.getInt(
+          SLSConfiguration.METRICS_TIMER_WINDOW_SIZE,
+          SLSConfiguration.METRICS_TIMER_WINDOW_SIZE_DEFAULT);
+      schedulerAllocateTimer = new Timer(
+          new SlidingWindowReservoir(timeWindowSize));
+      schedulerHandleTimer = new Timer(
+          new SlidingWindowReservoir(timeWindowSize));
+      schedulerHandleTimerMap = new HashMap<>();
+      for (SchedulerEventType e : SchedulerEventType.values()) {
+        Timer timer = new Timer(new SlidingWindowReservoir(timeWindowSize));
+        schedulerHandleTimerMap.put(e, timer);
+      }
+      // histogram for scheduler operations (Samplers)
+      schedulerHistogramList = new ArrayList<>();
+      histogramTimerMap = new HashMap<>();
+      Histogram schedulerAllocateHistogram = new Histogram(
+          new SlidingWindowReservoir(SAMPLING_SIZE));
+      metrics.register("sampler.scheduler.operation.allocate.timecost",
+          schedulerAllocateHistogram);
+      schedulerHistogramList.add(schedulerAllocateHistogram);
+      histogramTimerMap.put(schedulerAllocateHistogram, 
schedulerAllocateTimer);
+      Histogram schedulerHandleHistogram = new Histogram(
+          new SlidingWindowReservoir(SAMPLING_SIZE));
+      metrics.register("sampler.scheduler.operation.handle.timecost",
+          schedulerHandleHistogram);
+      schedulerHistogramList.add(schedulerHandleHistogram);
+      histogramTimerMap.put(schedulerHandleHistogram, schedulerHandleTimer);
+      for (SchedulerEventType e : SchedulerEventType.values()) {
+        Histogram histogram = new Histogram(
+            new SlidingWindowReservoir(SAMPLING_SIZE));
+        metrics.register(
+            "sampler.scheduler.operation.handle." + e + ".timecost",
+            histogram);
+        schedulerHistogramList.add(histogram);
+        histogramTimerMap.put(histogram, schedulerHandleTimerMap.get(e));
+      }
+    } finally {
+      samplerLock.unlock();
+    }
+  }
+
+  private void initMetricsCSVOutput() {
+    int timeIntervalMS = conf.getInt(
+        SLSConfiguration.METRICS_RECORD_INTERVAL_MS,
+        SLSConfiguration.METRICS_RECORD_INTERVAL_MS_DEFAULT);
+    File dir = new File(metricsOutputDir + "/metrics");
+    if(!dir.exists() && !dir.mkdirs()) {
+      LOG.error("Cannot create directory {}", dir.getAbsoluteFile());
+    }
+    final CsvReporter reporter = CsvReporter.forRegistry(metrics)
+        .formatFor(Locale.US)
+        .convertRatesTo(TimeUnit.SECONDS)
+        .convertDurationsTo(TimeUnit.MILLISECONDS)
+        .build(new File(metricsOutputDir + "/metrics"));
+    reporter.start(timeIntervalMS, TimeUnit.MILLISECONDS);
+  }
+
+  boolean isRunning() {
+    return running;
+  }
+
+  void setRunning(boolean running) {
+    this.running = running;
+  }
+
+  class HistogramsRunnable implements Runnable {
+    @Override
+    public void run() {
+      samplerLock.lock();
+      try {
+        for (Histogram histogram : schedulerHistogramList) {
+          Timer timer = histogramTimerMap.get(histogram);
+          histogram.update((int) timer.getSnapshot().getMean());
+        }
+      } finally {
+        samplerLock.unlock();
+      }
+    }
+  }
+
+  class MetricsLogRunnable implements Runnable {
+    private boolean firstLine = true;
+
+    MetricsLogRunnable() {
+      try {
+        metricsLogBW =
+            new BufferedWriter(new OutputStreamWriter(new FileOutputStream(
+                metricsOutputDir + "/realtimetrack.json"), "UTF-8"));
+        metricsLogBW.write("[");
+      } catch (IOException e) {
+        LOG.info(e.getMessage());
+      }
+    }
+
+    @Override
+    public void run() {
+      if(running) {
+        // all WebApp to get real tracking json
+        String trackingMetrics = web.generateRealTimeTrackingMetrics();
+        // output
+        try {
+          if(firstLine) {
+            metricsLogBW.write(trackingMetrics + EOL);
+            firstLine = false;
+          } else {
+            metricsLogBW.write("," + trackingMetrics + EOL);
+          }
+          metricsLogBW.flush();
+        } catch (IOException e) {
+          LOG.info(e.getMessage());
+        }
+      }
+    }
+  }
+
+  void tearDown() throws Exception {
+    if (metricsLogBW != null)  {
+      metricsLogBW.write("]");
+      metricsLogBW.close();
+    }
+
+    if (web != null) {
+      web.stop();
+    }
+
+    if (jobRuntimeLogBW != null) {
+      jobRuntimeLogBW.close();
+    }
+
+    if (pool != null) {
+      pool.shutdown();
+    }
+  }
+
+  void increaseSchedulerAllocationCounter() {
+    schedulerAllocateCounter.inc();
+  }
+
+  void increaseSchedulerHandleCounter(SchedulerEventType schedulerEventType) {
+    schedulerHandleCounter.inc();
+    schedulerHandleCounterMap.get(schedulerEventType).inc();
+  }
+
+  Timer getSchedulerAllocateTimer() {
+    return schedulerAllocateTimer;
+  }
+
+  Timer getSchedulerHandleTimer() {
+    return schedulerHandleTimer;
+  }
+
+  Timer getSchedulerHandleTimer(SchedulerEventType schedulerEventType) {
+    return schedulerHandleTimerMap.get(schedulerEventType);
+  }
+
+  private enum QueueMetric {
+    PENDING_MEMORY("pending.memory"),
+    PENDING_VCORES("pending.cores"),
+    ALLOCATED_MEMORY("allocated.memory"),
+    ALLOCATED_VCORES("allocated.cores");
+
+    private String value;
+
+    QueueMetric(String value) {
+      this.value = value;
+    }
+  }
+
+  private String getQueueMetricName(String queue, QueueMetric metric) {
+    return "counter.queue." + queue + "." + metric.value;
+  }
+
+  private void traceQueueIfNotTraced(String queue) {
+    queueLock.lock();
+    try {
+      if (!isTracked(queue)) {
+        trackQueue(queue);
+      }
+    } finally {
+      queueLock.unlock();
+    }
+  }
+
+  void initQueueMetric(String queueName){
+    SortedMap<String, Counter> counterMap = metrics.getCounters();
+
+    for (QueueMetric queueMetric : QueueMetric.values()) {
+      String metricName = getQueueMetricName(queueName, queueMetric);
+      if (!counterMap.containsKey(metricName)) {
+        metrics.counter(metricName);
+        counterMap = metrics.getCounters();
+      }
+    }
+
+    traceQueueIfNotTraced(queueName);
+  }
+
+  void updateQueueMetrics(Resource pendingResource, Resource allocatedResource,
+      String queueName) {
+    SortedMap<String, Counter> counterMap = metrics.getCounters();
+    for(QueueMetric metric : QueueMetric.values()) {
+      String metricName = getQueueMetricName(queueName, metric);
+      if (!counterMap.containsKey(metricName)) {
+        metrics.counter(metricName);
+        counterMap = metrics.getCounters();
+      }
+
+      if (metric == QueueMetric.PENDING_MEMORY) {
+        counterMap.get(metricName).inc(pendingResource.getMemorySize());
+      } else if (metric == QueueMetric.PENDING_VCORES) {
+        counterMap.get(metricName).inc(pendingResource.getVirtualCores());
+      } else if (metric == QueueMetric.ALLOCATED_MEMORY) {
+        counterMap.get(metricName).inc(allocatedResource.getMemorySize());
+      } else if (metric == QueueMetric.ALLOCATED_VCORES){
+        counterMap.get(metricName).inc(allocatedResource.getVirtualCores());
+      }
+    }
+
+    traceQueueIfNotTraced(queueName);
+  }
+
+  void updateQueueMetricsByRelease(Resource releaseResource, String queue) {
+    SortedMap<String, Counter> counterMap = metrics.getCounters();
+    String name = getQueueMetricName(queue, QueueMetric.ALLOCATED_MEMORY);
+    if (!counterMap.containsKey(name)) {
+      metrics.counter(name);
+      counterMap = metrics.getCounters();
+    }
+    counterMap.get(name).inc(-releaseResource.getMemorySize());
+
+    String vcoreMetric =
+        getQueueMetricName(queue, QueueMetric.ALLOCATED_VCORES);
+    if (!counterMap.containsKey(vcoreMetric)) {
+      metrics.counter(vcoreMetric);
+      counterMap = metrics.getCounters();
+    }
+    counterMap.get(vcoreMetric).inc(-releaseResource.getVirtualCores());
+  }
+
+  public void addTrackedApp(ApplicationId appId,
+      String oldAppId) {
+    trackApp(appId, oldAppId);
+  }
+
+  public void removeTrackedApp(String oldAppId) {
+    untrackApp(oldAppId);
+  }
+
+  public void addAMRuntime(ApplicationId appId, long traceStartTimeMS,
+      long traceEndTimeMS, long simulateStartTimeMS, long simulateEndTimeMS) {
+    try {
+      // write job runtime information
+      StringBuilder sb = new StringBuilder();
+      sb.append(appId).append(",").append(traceStartTimeMS).append(",")
+          .append(traceEndTimeMS).append(",").append(simulateStartTimeMS)
+          .append(",").append(simulateEndTimeMS);
+      jobRuntimeLogBW.write(sb.toString() + EOL);
+      jobRuntimeLogBW.flush();
+    } catch (IOException e) {
+      LOG.info(e.getMessage());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java
index 44629f5..7112b1a 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SchedulerWrapper.java
@@ -17,27 +17,16 @@
  */
 package org.apache.hadoop.yarn.sls.scheduler;
 
-import java.util.Set;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-import com.codahale.metrics.MetricRegistry;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 
+@Private
+@Unstable
 public interface SchedulerWrapper {
+  SchedulerMetrics getSchedulerMetrics();
 
-       public MetricRegistry getMetrics();
-       public SchedulerMetrics getSchedulerMetrics();
-       public Set<String> getQueueSet();
-       public void setQueueSet(Set<String> queues);
-       public Set<String> getTrackedAppSet();
-       public void setTrackedAppSet(Set<String> apps);
-       public void addTrackedApp(ApplicationAttemptId appAttemptId,
-              String oldAppId);
-       public void removeTrackedApp(ApplicationAttemptId appAttemptId,
-                 String oldAppId);
-       public void addAMRuntime(ApplicationId appId,
-              long traceStartTimeMS, long traceEndTimeMS,
-              long simulateStartTimeMS, long simulateEndTimeMS);
+  Tracker getTracker();
 
+  String getRealQueueName(String queue) throws YarnException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java
index d352904..19cfe88 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/TaskRunner.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.yarn.sls.scheduler;
 
-import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.Queue;
 import java.util.concurrent.DelayQueue;
@@ -27,7 +26,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 
 @Private
 @Unstable
@@ -148,8 +146,8 @@ public class TaskRunner {
 
   @SuppressWarnings("unchecked")
   public void start() {
-    if (executor != null) {
-      throw new IllegalStateException("Already started");
+    if (executor != null && !executor.isTerminated()) {
+      throw new IllegalStateException("Executor already running");
     }
     DelayQueue preStartQueue = queue;
 
@@ -164,8 +162,9 @@ public class TaskRunner {
     }
   }
   
-  public void stop() {
+  public void stop() throws InterruptedException {
     executor.shutdownNow();
+    executor.awaitTermination(20, TimeUnit.SECONDS);
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java
new file mode 100644
index 0000000..42a5c3c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/Tracker.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+import java.util.Set;
+
+@Private
+@Unstable
+public class Tracker {
+  private Set<String> queueSet;
+  private Set<String> trackedAppSet;
+
+  public void setQueueSet(Set<String> queues) {
+    queueSet = queues;
+  }
+
+  public Set<String> getQueueSet() {
+    return queueSet;
+  }
+
+  public void setTrackedAppSet(Set<String> apps) {
+    trackedAppSet = apps;
+  }
+
+  public Set<String> getTrackedAppSet() {
+    return trackedAppSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
new file mode 100644
index 0000000..3ed81e1
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJob.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.synthetic;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.distribution.LogNormalDistribution;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskStatus.State;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.*;
+import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.mapreduce.MRJobConfig.QUEUE_NAME;
+
+/**
+ * Generates random task data for a synthetic job.
+ */
+public class SynthJob implements JobStory {
+
+  @SuppressWarnings("StaticVariableName")
+  private static Log LOG = LogFactory.getLog(SynthJob.class);
+
+  private final Configuration conf;
+  private final int id;
+
+  @SuppressWarnings("ConstantName")
+  private static final AtomicInteger sequence = new AtomicInteger(0);
+  private final String name;
+  private final String queueName;
+  private final SynthJobClass jobClass;
+
+  // job timing
+  private final long submitTime;
+  private final long duration;
+  private final long deadline;
+
+  private final int numMapTasks;
+  private final int numRedTasks;
+  private final long mapMaxMemory;
+  private final long reduceMaxMemory;
+  private final long mapMaxVcores;
+  private final long reduceMaxVcores;
+  private final long[] mapRuntime;
+  private final float[] reduceRuntime;
+  private long totMapRuntime;
+  private long totRedRuntime;
+
+  public SynthJob(JDKRandomGenerator rand, Configuration conf,
+      SynthJobClass jobClass, long actualSubmissionTime) {
+
+    this.conf = conf;
+    this.jobClass = jobClass;
+
+    this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS);
+    this.numMapTasks = jobClass.getMtasks();
+    this.numRedTasks = jobClass.getRtasks();
+
+    // sample memory distributions, correct for sub-minAlloc sizes
+    long tempMapMaxMemory = jobClass.getMapMaxMemory();
+    this.mapMaxMemory = tempMapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB
+        ? MRJobConfig.DEFAULT_MAP_MEMORY_MB : tempMapMaxMemory;
+    long tempReduceMaxMemory = jobClass.getReduceMaxMemory();
+    this.reduceMaxMemory =
+            tempReduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB
+            ? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : tempReduceMaxMemory;
+
+    // sample vcores distributions, correct for sub-minAlloc sizes
+    long tempMapMaxVCores = jobClass.getMapMaxVcores();
+    this.mapMaxVcores = tempMapMaxVCores < MRJobConfig.DEFAULT_MAP_CPU_VCORES
+        ? MRJobConfig.DEFAULT_MAP_CPU_VCORES : tempMapMaxVCores;
+    long tempReduceMaxVcores = jobClass.getReduceMaxVcores();
+    this.reduceMaxVcores =
+        tempReduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES
+            ? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : tempReduceMaxVcores;
+
+    if (numMapTasks > 0) {
+      conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory);
+      conf.set(MRJobConfig.MAP_JAVA_OPTS,
+          "-Xmx" + (this.mapMaxMemory - 100) + "m");
+    }
+
+    if (numRedTasks > 0) {
+      conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory);
+      conf.set(MRJobConfig.REDUCE_JAVA_OPTS,
+          "-Xmx" + (this.reduceMaxMemory - 100) + "m");
+    }
+
+    boolean hasDeadline =
+        (rand.nextDouble() <= jobClass.jobClass.chance_of_reservation);
+
+    LogNormalDistribution deadlineFactor =
+        SynthUtils.getLogNormalDist(rand, 
jobClass.jobClass.deadline_factor_avg,
+            jobClass.jobClass.deadline_factor_stddev);
+
+    double deadlineFactorSample =
+        (deadlineFactor != null) ? deadlineFactor.sample() : -1;
+
+    this.queueName = jobClass.workload.getQueueName();
+
+    this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS);
+
+    this.deadline =
+        hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS)
+            + (long) Math.ceil(deadlineFactorSample * duration) : -1;
+
+    conf.set(QUEUE_NAME, queueName);
+
+    // name and initialize job randomness
+    final long seed = rand.nextLong();
+    rand.setSeed(seed);
+    id = sequence.getAndIncrement();
+
+    name = String.format(jobClass.getClassName() + "_%06d", id);
+    LOG.debug(name + " (" + seed + ")");
+
+    LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime
+        + " deadline:" + deadline + " duration:" + duration
+        + " deadline-submission: " + (deadline - submitTime));
+
+    // generate map and reduce runtimes
+    mapRuntime = new long[numMapTasks];
+    for (int i = 0; i < numMapTasks; i++) {
+      mapRuntime[i] = jobClass.getMapTimeSample();
+      totMapRuntime += mapRuntime[i];
+    }
+    reduceRuntime = new float[numRedTasks];
+    for (int i = 0; i < numRedTasks; i++) {
+      reduceRuntime[i] = jobClass.getReduceTimeSample();
+      totRedRuntime += (long) Math.ceil(reduceRuntime[i]);
+    }
+  }
+
+  public boolean hasDeadline() {
+    return deadline > 0;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public String getUser() {
+    return jobClass.getUserName();
+  }
+
+  @Override
+  public JobID getJobID() {
+    return new JobID("job_mock_" + name, id);
+  }
+
+  @Override
+  public Values getOutcome() {
+    return Values.SUCCESS;
+  }
+
+  @Override
+  public long getSubmissionTime() {
+    return submitTime;
+  }
+
+  @Override
+  public int getNumberMaps() {
+    return numMapTasks;
+  }
+
+  @Override
+  public int getNumberReduces() {
+    return numRedTasks;
+  }
+
+  @Override
+  public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
+    switch (taskType) {
+    case MAP:
+      return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores);
+    case REDUCE:
+      return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores);
+    default:
+      throw new IllegalArgumentException("Not interested");
+    }
+  }
+
+  @Override
+  public InputSplit[] getInputSplits() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
+      int taskAttemptNumber) {
+    switch (taskType) {
+    case MAP:
+      return new MapTaskAttemptInfo(State.SUCCEEDED,
+          getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null);
+
+    case REDUCE:
+      // We assume uniform split between pull/sort/reduce
+      // aligned with naive progress reporting assumptions
+      return new ReduceTaskAttemptInfo(State.SUCCEEDED,
+          getTaskInfo(taskType, taskNumber),
+          (long) Math.round((reduceRuntime[taskNumber] / 3)),
+          (long) Math.round((reduceRuntime[taskNumber] / 3)),
+          (long) Math.round((reduceRuntime[taskNumber] / 3)), null);
+
+    default:
+      break;
+    }
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
+      int taskAttemptNumber, int locality) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public org.apache.hadoop.mapred.JobConf getJobConf() {
+    return new JobConf(conf);
+  }
+
+  @Override
+  public String getQueueName() {
+    return queueName;
+  }
+
+  @Override
+  public String toString() {
+    return "SynthJob [\n" + "  workload=" + jobClass.getWorkload().getId()
+        + "\n" + "  jobClass="
+        + jobClass.getWorkload().getClassList().indexOf(jobClass) + "\n"
+        + "  conf=" + conf + ",\n" + "  id=" + id + ",\n" + "  name=" + name
+        + ",\n" + "  mapRuntime=" + Arrays.toString(mapRuntime) + ",\n"
+        + "  reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n"
+        + "  submitTime=" + submitTime + ",\n" + "  numMapTasks=" + numMapTasks
+        + ",\n" + "  numRedTasks=" + numRedTasks + ",\n" + "  mapMaxMemory="
+        + mapMaxMemory + ",\n" + "  reduceMaxMemory=" + reduceMaxMemory + ",\n"
+        + "  queueName=" + queueName + "\n" + "]";
+  }
+
+  public SynthJobClass getJobClass() {
+    return jobClass;
+  }
+
+  public long getTotalSlotTime() {
+    return totMapRuntime + totRedRuntime;
+  }
+
+  public long getDuration() {
+    return duration;
+  }
+
+  public long getDeadline() {
+    return deadline;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof SynthJob)) {
+      return false;
+    }
+    SynthJob o = (SynthJob) other;
+    return Arrays.equals(mapRuntime, o.mapRuntime)
+        && Arrays.equals(reduceRuntime, o.reduceRuntime)
+        && submitTime == o.submitTime && numMapTasks == o.numMapTasks
+        && numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory
+        && reduceMaxMemory == o.reduceMaxMemory
+        && mapMaxVcores == o.mapMaxVcores
+        && reduceMaxVcores == o.reduceMaxVcores && 
queueName.equals(o.queueName)
+        && jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime
+        && totRedRuntime == o.totRedRuntime;
+  }
+
+  @Override
+  public int hashCode() {
+    // could have a bad distr; investigate if a relevant use case exists
+    return jobClass.hashCode() * (int) submitTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
new file mode 100644
index 0000000..439698f
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthJobClass.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.synthetic;
+
+import org.apache.commons.math3.distribution.AbstractRealDistribution;
+import org.apache.commons.math3.distribution.LogNormalDistribution;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.JobClass;
+import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace;
+
+/**
+ * This is a class that represent a class of Jobs. It is used to generate an
+ * individual job, by picking random durations, task counts, container size,
+ * etc.
+ */
+public class SynthJobClass {
+
+  private final JDKRandomGenerator rand;
+  private final LogNormalDistribution dur;
+  private final LogNormalDistribution mapRuntime;
+  private final LogNormalDistribution redRuntime;
+  private final LogNormalDistribution mtasks;
+  private final LogNormalDistribution rtasks;
+  private final LogNormalDistribution mapMem;
+  private final LogNormalDistribution redMem;
+  private final LogNormalDistribution mapVcores;
+  private final LogNormalDistribution redVcores;
+
+  private final Trace trace;
+  @SuppressWarnings("VisibilityModifier")
+  protected final SynthWorkload workload;
+  @SuppressWarnings("VisibilityModifier")
+  protected final JobClass jobClass;
+
+  public SynthJobClass(JDKRandomGenerator rand, Trace trace,
+      SynthWorkload workload, int classId) {
+
+    this.trace = trace;
+    this.workload = workload;
+    this.rand = new JDKRandomGenerator();
+    this.rand.setSeed(rand.nextLong());
+    jobClass = trace.workloads.get(workload.getId()).job_classes.get(classId);
+
+    this.dur = SynthUtils.getLogNormalDist(rand, jobClass.dur_avg,
+        jobClass.dur_stddev);
+    this.mapRuntime = SynthUtils.getLogNormalDist(rand, jobClass.mtime_avg,
+        jobClass.mtime_stddev);
+    this.redRuntime = SynthUtils.getLogNormalDist(rand, jobClass.rtime_avg,
+        jobClass.rtime_stddev);
+    this.mtasks = SynthUtils.getLogNormalDist(rand, jobClass.mtasks_avg,
+        jobClass.mtasks_stddev);
+    this.rtasks = SynthUtils.getLogNormalDist(rand, jobClass.rtasks_avg,
+        jobClass.rtasks_stddev);
+
+    this.mapMem = SynthUtils.getLogNormalDist(rand, 
jobClass.map_max_memory_avg,
+        jobClass.map_max_memory_stddev);
+    this.redMem = SynthUtils.getLogNormalDist(rand,
+        jobClass.reduce_max_memory_avg, jobClass.reduce_max_memory_stddev);
+    this.mapVcores = SynthUtils.getLogNormalDist(rand,
+        jobClass.map_max_vcores_avg, jobClass.map_max_vcores_stddev);
+    this.redVcores = SynthUtils.getLogNormalDist(rand,
+        jobClass.reduce_max_vcores_avg, jobClass.reduce_max_vcores_stddev);
+  }
+
+  public JobStory getJobStory(Configuration conf, long actualSubmissionTime) {
+    return new SynthJob(rand, conf, this, actualSubmissionTime);
+  }
+
+  @Override
+  public String toString() {
+    return "SynthJobClass [workload=" + workload.getName() + ", class="
+        + jobClass.class_name + " job_count=" + jobClass.class_weight + ", 
dur="
+        + ((dur != null) ? dur.getNumericalMean() : 0) + ", mapRuntime="
+        + ((mapRuntime != null) ? mapRuntime.getNumericalMean() : 0)
+        + ", redRuntime="
+        + ((redRuntime != null) ? redRuntime.getNumericalMean() : 0)
+        + ", mtasks=" + ((mtasks != null) ? mtasks.getNumericalMean() : 0)
+        + ", rtasks=" + ((rtasks != null) ? rtasks.getNumericalMean() : 0)
+        + ", chance_of_reservation=" + jobClass.chance_of_reservation + "]\n";
+
+  }
+
+  public double getClassWeight() {
+    return jobClass.class_weight;
+  }
+
+  public long getDur() {
+    return genLongSample(dur);
+  }
+
+  public int getMtasks() {
+    return genIntSample(mtasks);
+  }
+
+  public int getRtasks() {
+    return genIntSample(rtasks);
+  }
+
+  public long getMapMaxMemory() {
+    return genLongSample(mapMem);
+  }
+
+  public long getReduceMaxMemory() {
+    return genLongSample(redMem);
+  }
+
+  public long getMapMaxVcores() {
+    return genLongSample(mapVcores);
+  }
+
+  public long getReduceMaxVcores() {
+    return genLongSample(redVcores);
+  }
+
+  public SynthWorkload getWorkload() {
+    return workload;
+  }
+
+  public int genIntSample(AbstractRealDistribution dist) {
+    if (dist == null) {
+      return 0;
+    }
+    double baseSample = dist.sample();
+    if (baseSample < 0) {
+      baseSample = 0;
+    }
+    return (int) (Integer.MAX_VALUE & (long) Math.ceil(baseSample));
+  }
+
+  public long genLongSample(AbstractRealDistribution dist) {
+    return dist != null ? (long) Math.ceil(dist.sample()) : 0;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof SynthJobClass)) {
+      return false;
+    }
+    SynthJobClass o = (SynthJobClass) other;
+    return workload.equals(o.workload);
+  }
+
+  @Override
+  public int hashCode() {
+    return workload.hashCode() * workload.getId();
+  }
+
+  public String getClassName() {
+    return jobClass.class_name;
+  }
+
+  public long getMapTimeSample() {
+    return genLongSample(mapRuntime);
+  }
+
+  public long getReduceTimeSample() {
+    return genLongSample(redRuntime);
+  }
+
+  public String getUserName() {
+    return jobClass.user_name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
new file mode 100644
index 0000000..14b0371
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthTraceJobProducer.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.synthetic;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.rumen.JobStory;
+import org.apache.hadoop.tools.rumen.JobStoryProducer;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.codehaus.jackson.JsonParser.Feature.INTERN_FIELD_NAMES;
+import static 
org.codehaus.jackson.map.DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES;
+
+/**
+ * This is a JobStoryProducer that operates from distribution of different
+ * workloads. The .json input file is used to determine how many jobs, which
+ * size, number of maps/reducers and their duration, as well as the temporal
+ * distributed of submissions. For each parameter we control avg and stdev, and
+ * generate values via normal or log-normal distributions.
+ */
+public class SynthTraceJobProducer implements JobStoryProducer {
+
+  @SuppressWarnings("StaticVariableName")
+  private static final Log LOG = 
LogFactory.getLog(SynthTraceJobProducer.class);
+
+  private final Configuration conf;
+  private final AtomicInteger numJobs;
+  private final Trace trace;
+  private final long seed;
+
+  private int totalWeight;
+  private final List<Double> weightList;
+  private final Map<Integer, SynthWorkload> workloads;
+
+  private final Queue<StoryParams> listStoryParams;
+
+  private final JDKRandomGenerator rand;
+
+  public static final String SLS_SYNTHETIC_TRACE_FILE =
+      "sls.synthetic" + ".trace_file";
+
+  public SynthTraceJobProducer(Configuration conf) throws IOException {
+    this(conf, new Path(conf.get(SLS_SYNTHETIC_TRACE_FILE)));
+  }
+
+  public SynthTraceJobProducer(Configuration conf, Path path)
+      throws IOException {
+
+    LOG.info("SynthTraceJobProducer");
+
+    this.conf = conf;
+    this.rand = new JDKRandomGenerator();
+    workloads = new HashMap<Integer, SynthWorkload>();
+    weightList = new ArrayList<Double>();
+
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.configure(INTERN_FIELD_NAMES, true);
+    mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+    FileSystem ifs = path.getFileSystem(conf);
+    FSDataInputStream fileIn = ifs.open(path);
+
+    this.trace = mapper.readValue(fileIn, Trace.class);
+    seed = trace.rand_seed;
+    rand.setSeed(seed);
+
+    this.numJobs = new AtomicInteger(trace.num_jobs);
+
+    for (int workloadId = 0; workloadId < trace.workloads
+        .size(); workloadId++) {
+      SynthWorkload workload = new SynthWorkload(workloadId, trace);
+      for (int classId =
+          0; classId < trace.workloads.get(workloadId).job_classes
+              .size(); classId++) {
+        SynthJobClass cls = new SynthJobClass(rand, trace, workload, classId);
+        workload.add(cls);
+      }
+      workloads.put(workloadId, workload);
+    }
+
+    for (int i = 0; i < workloads.size(); i++) {
+      double w = workloads.get(i).getWorkloadWeight();
+      totalWeight += w;
+      weightList.add(w);
+    }
+
+    // create priority queue to keep start-time sorted
+    listStoryParams =
+        new PriorityQueue<>(10, new Comparator<StoryParams>() {
+          @Override
+          public int compare(StoryParams o1, StoryParams o2) {
+            long value = o2.actualSubmissionTime - o1.actualSubmissionTime;
+            if ((int)value != value) {
+              throw new ArithmeticException("integer overflow");
+            }
+            return (int)value;
+          }
+        });
+
+    // initialize it
+    createStoryParams();
+    LOG.info("Generated " + listStoryParams.size() + " deadlines for "
+        + this.numJobs.get() + " jobs ");
+  }
+
+  public long getSeed() {
+    return seed;
+  }
+
+  public int getNodesPerRack() {
+    return trace.nodes_per_rack < 1 ? 1: trace.nodes_per_rack;
+  }
+
+  public int getNumNodes() {
+    return trace.num_nodes;
+  }
+
+  /**
+   * Class used to parse a trace configuration file.
+   */
+  @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
+  @XmlRootElement
+  public static class Trace {
+    @JsonProperty("description")
+    String description;
+    @JsonProperty("num_nodes")
+    int num_nodes;
+    @JsonProperty("nodes_per_rack")
+    int nodes_per_rack;
+    @JsonProperty("num_jobs")
+    int num_jobs;
+
+    // in sec (selects a portion of time_distribution
+    @JsonProperty("rand_seed")
+    long rand_seed;
+    @JsonProperty("workloads")
+    List<Workload> workloads;
+
+  }
+
+  /**
+   * Class used to parse a workload from file.
+   */
+  @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
+  public static class Workload {
+    @JsonProperty("workload_name")
+    String workload_name;
+    // used to change probability this workload is picked for each job
+    @JsonProperty("workload_weight")
+    double workload_weight;
+    @JsonProperty("queue_name")
+    String queue_name;
+    @JsonProperty("job_classes")
+    List<JobClass> job_classes;
+    @JsonProperty("time_distribution")
+    List<TimeSample> time_distribution;
+  }
+
+  /**
+   * Class used to parse a job class from file.
+   */
+  @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
+  public static class JobClass {
+
+    @JsonProperty("class_name")
+    String class_name;
+    @JsonProperty("user_name")
+    String user_name;
+
+    // used to change probability this class is chosen
+    @JsonProperty("class_weight")
+    double class_weight;
+
+    // reservation related params
+    @JsonProperty("chance_of_reservation")
+    double chance_of_reservation;
+    @JsonProperty("deadline_factor_avg")
+    double deadline_factor_avg;
+    @JsonProperty("deadline_factor_stddev")
+    double deadline_factor_stddev;
+
+    // durations in sec
+    @JsonProperty("dur_avg")
+    double dur_avg;
+    @JsonProperty("dur_stddev")
+    double dur_stddev;
+    @JsonProperty("mtime_avg")
+    double mtime_avg;
+    @JsonProperty("mtime_stddev")
+    double mtime_stddev;
+    @JsonProperty("rtime_avg")
+    double rtime_avg;
+    @JsonProperty("rtime_stddev")
+    double rtime_stddev;
+
+    // number of tasks
+    @JsonProperty("mtasks_avg")
+    double mtasks_avg;
+    @JsonProperty("mtasks_stddev")
+    double mtasks_stddev;
+    @JsonProperty("rtasks_avg")
+    double rtasks_avg;
+    @JsonProperty("rtasks_stddev")
+    double rtasks_stddev;
+
+    // memory in MB
+    @JsonProperty("map_max_memory_avg")
+    long map_max_memory_avg;
+    @JsonProperty("map_max_memory_stddev")
+    double map_max_memory_stddev;
+    @JsonProperty("reduce_max_memory_avg")
+    long reduce_max_memory_avg;
+    @JsonProperty("reduce_max_memory_stddev")
+    double reduce_max_memory_stddev;
+
+    // vcores
+    @JsonProperty("map_max_vcores_avg")
+    long map_max_vcores_avg;
+    @JsonProperty("map_max_vcores_stddev")
+    double map_max_vcores_stddev;
+    @JsonProperty("reduce_max_vcores_avg")
+    long reduce_max_vcores_avg;
+    @JsonProperty("reduce_max_vcores_stddev")
+    double reduce_max_vcores_stddev;
+
+  }
+
+  /**
+   * This is used to define time-varying probability of a job start-time (e.g.,
+   * to simulate daily patterns).
+   */
+  @SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
+  public static class TimeSample {
+    // in sec
+    @JsonProperty("time")
+    int time;
+    @JsonProperty("weight")
+    double jobs;
+  }
+
+  static class StoryParams {
+    private SynthJobClass pickedJobClass;
+    private long actualSubmissionTime;
+
+    StoryParams(SynthJobClass pickedJobClass, long actualSubmissionTime) {
+      this.pickedJobClass = pickedJobClass;
+      this.actualSubmissionTime = actualSubmissionTime;
+    }
+  }
+
+
+  void createStoryParams() {
+
+    for (int i = 0; i < numJobs.get(); i++) {
+      int workload = SynthUtils.getWeighted(weightList, rand);
+      SynthWorkload pickedWorkload = workloads.get(workload);
+      long jobClass =
+          SynthUtils.getWeighted(pickedWorkload.getWeightList(), rand);
+      SynthJobClass pickedJobClass =
+          pickedWorkload.getClassList().get((int) jobClass);
+      long actualSubmissionTime = pickedWorkload.getBaseSubmissionTime(rand);
+      // long actualSubmissionTime = (i + 1) * 10;
+      listStoryParams
+          .add(new StoryParams(pickedJobClass, actualSubmissionTime));
+    }
+  }
+
+  @Override
+  public JobStory getNextJob() throws IOException {
+    if (numJobs.decrementAndGet() < 0) {
+      return null;
+    }
+    StoryParams storyParams = listStoryParams.poll();
+    return storyParams.pickedJobClass.getJobStory(conf,
+        storyParams.actualSubmissionTime);
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public String toString() {
+    return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs
+        + ", weightList=" + weightList + ", r=" + rand + ", totalWeight="
+        + totalWeight + ", workloads=" + workloads + "]";
+  }
+
+  public int getNumJobs() {
+    return trace.num_jobs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java
new file mode 100644
index 0000000..a7f8c7f
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthUtils.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.synthetic;
+
+import org.apache.commons.math3.distribution.LogNormalDistribution;
+import org.apache.commons.math3.distribution.NormalDistribution;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+
+import java.util.Collection;
+import java.util.Random;
+
+/**
+ * Utils for the Synthetic generator.
+ */
+public final class SynthUtils {
+
+  private SynthUtils(){
+    //class is not meant to be instantiated
+  }
+
+  public static int getWeighted(Collection<Double> weights, Random rr) {
+
+    double totalWeight = 0;
+    for (Double i : weights) {
+      totalWeight += i;
+    }
+
+    double rand = rr.nextDouble() * totalWeight;
+
+    double cur = 0;
+    int ind = 0;
+    for (Double i : weights) {
+      cur += i;
+      if (cur > rand) {
+        break;
+      }
+      ind++;
+    }
+
+    return ind;
+  }
+
+  public static NormalDistribution getNormalDist(JDKRandomGenerator rand,
+      double average, double stdDev) {
+
+    if (average <= 0) {
+      return null;
+    }
+
+    // set default for missing param
+    if (stdDev == 0) {
+      stdDev = average / 6;
+    }
+
+    NormalDistribution ret = new NormalDistribution(average, stdDev,
+        NormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY);
+    ret.reseedRandomGenerator(rand.nextLong());
+    return ret;
+  }
+
+  public static LogNormalDistribution getLogNormalDist(JDKRandomGenerator rand,
+      double mean, double stdDev) {
+
+    if (mean <= 0) {
+      return null;
+    }
+
+    // set default for missing param
+    if (stdDev == 0) {
+      stdDev = mean / 6;
+    }
+
+    // derive lognormal parameters for X = LogNormal(mu, sigma)
+    // sigma^2 = ln (1+Var[X]/(E[X])^2)
+    // mu = ln(E[X]) - 1/2 * sigma^2
+    double var = stdDev * stdDev;
+    double sigmasq = Math.log1p(var / (mean * mean));
+    double sigma = Math.sqrt(sigmasq);
+    double mu = Math.log(mean) - 0.5 * sigmasq;
+
+    LogNormalDistribution ret = new LogNormalDistribution(mu, sigma,
+        LogNormalDistribution.DEFAULT_INVERSE_ABSOLUTE_ACCURACY);
+    ret.reseedRandomGenerator(rand.nextLong());
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java
new file mode 100644
index 0000000..9e5fd4e
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/SynthWorkload.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.sls.synthetic;
+
+import org.apache.hadoop.yarn.sls.synthetic.SynthTraceJobProducer.Trace;
+
+import java.util.*;
+
+/**
+ * This class represent a workload (made up of multiple SynthJobClass(es)). It
+ * also stores the temporal distributions of jobs in this workload.
+ */
+public class SynthWorkload {
+
+  private final int id;
+  private final List<SynthJobClass> classList;
+  private final Trace trace;
+  private final SortedMap<Integer, Double> timeWeights;
+
+  public SynthWorkload(int identifier, Trace inTrace) {
+    classList = new ArrayList<SynthJobClass>();
+    this.id = identifier;
+    this.trace = inTrace;
+    timeWeights = new TreeMap<Integer, Double>();
+    for (SynthTraceJobProducer.TimeSample ts : trace.workloads
+        .get(id).time_distribution) {
+      timeWeights.put(ts.time, ts.jobs);
+    }
+  }
+
+  public boolean add(SynthJobClass s) {
+    return classList.add(s);
+  }
+
+  public List<Double> getWeightList() {
+    ArrayList<Double> ret = new ArrayList<Double>();
+    for (SynthJobClass s : classList) {
+      ret.add(s.getClassWeight());
+    }
+    return ret;
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof SynthWorkload)) {
+      return false;
+    }
+    // assume ID determines job classes by construction
+    return getId() == ((SynthWorkload) other).getId();
+  }
+
+  @Override
+  public int hashCode() {
+    return getId();
+  }
+
+  @Override
+  public String toString() {
+    return "SynthWorkload " + trace.workloads.get(id).workload_name + "[\n"
+        + classList + "]\n";
+  }
+
+  public String getName() {
+    return trace.workloads.get(id).workload_name;
+  }
+
+  public double getWorkloadWeight() {
+    return trace.workloads.get(id).workload_weight;
+  }
+
+  public String getQueueName() {
+    return trace.workloads.get(id).queue_name;
+  }
+
+  public long getBaseSubmissionTime(Random rand) {
+
+    // pick based on weights the "bucket" for this start time
+    int position = SynthUtils.getWeighted(timeWeights.values(), rand);
+
+    int[] time = new int[timeWeights.keySet().size()];
+    int index = 0;
+    for (Integer i : timeWeights.keySet()) {
+      time[index++] = i;
+    }
+
+    // uniformly pick a time between start and end time of this bucket
+    int startRange = time[position];
+    int endRange = startRange;
+    // if there is no subsequent bucket pick startRange
+    if (position < timeWeights.keySet().size() - 1) {
+      endRange = time[position + 1];
+      return startRange + rand.nextInt((endRange - startRange));
+    } else {
+      return startRange;
+    }
+  }
+
+  public List<SynthJobClass> getClassList() {
+    return classList;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/package-info.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/package-info.java
new file mode 100644
index 0000000..e069610
--- /dev/null
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/synthetic/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes comprising the synthetic load generator for SLS.
+ */
+package org.apache.hadoop.yarn.sls.synthetic;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d894f910/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
index f1b4f07..dbc2dab 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/utils/SLSUtils.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
@@ -37,12 +39,11 @@ import org.apache.hadoop.tools.rumen.JobTraceReader;
 import org.apache.hadoop.tools.rumen.LoggedJob;
 import org.apache.hadoop.tools.rumen.LoggedTask;
 import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.map.ObjectMapper;
 
 @Private
 @Unstable
 public class SLSUtils {
+  public final static String DEFAULT_JOB_TYPE = "mapreduce";
 
   // hostname includes the network path and the host name. for example
   // "/default-rack/hostFoo" or "/coreSwitchA/TORSwitchB/hostBar".
@@ -100,22 +101,15 @@ public class SLSUtils {
    */
   public static Set<String> parseNodesFromSLSTrace(String jobTrace)
           throws IOException {
-    Set<String> nodeSet = new HashSet<String>();
+    Set<String> nodeSet = new HashSet<>();
     JsonFactory jsonF = new JsonFactory();
     ObjectMapper mapper = new ObjectMapper();
     Reader input =
         new InputStreamReader(new FileInputStream(jobTrace), "UTF-8");
     try {
-      Iterator<Map> i = mapper.readValues(
-              jsonF.createJsonParser(input), Map.class);
+      Iterator<Map> i = mapper.readValues(jsonF.createParser(input), 
Map.class);
       while (i.hasNext()) {
-        Map jsonE = i.next();
-        List tasks = (List) jsonE.get("job.tasks");
-        for (Object o : tasks) {
-          Map jsonTask = (Map) o;
-          String hostname = jsonTask.get("container.host").toString();
-          nodeSet.add(hostname);
-        }
+        addNodes(nodeSet, i.next());
       }
     } finally {
       input.close();
@@ -123,6 +117,29 @@ public class SLSUtils {
     return nodeSet;
   }
 
+  private static void addNodes(Set<String> nodeSet, Map jsonEntry) {
+    if (jsonEntry.containsKey("num.nodes")) {
+      int numNodes = Integer.parseInt(jsonEntry.get("num.nodes").toString());
+      int numRacks = 1;
+      if (jsonEntry.containsKey("num.racks")) {
+        numRacks = Integer.parseInt(
+            jsonEntry.get("num.racks").toString());
+      }
+      nodeSet.addAll(generateNodes(numNodes, numRacks));
+    }
+
+    if (jsonEntry.containsKey("job.tasks")) {
+      List tasks = (List) jsonEntry.get("job.tasks");
+      for (Object o : tasks) {
+        Map jsonTask = (Map) o;
+        String hostname = (String) jsonTask.get("container.host");
+        if (hostname != null) {
+          nodeSet.add(hostname);
+        }
+      }
+    }
+  }
+
   /**
    * parse the input node file, return each host name
    */
@@ -134,8 +151,7 @@ public class SLSUtils {
     Reader input =
         new InputStreamReader(new FileInputStream(nodeFile), "UTF-8");
     try {
-      Iterator<Map> i = mapper.readValues(
-              jsonF.createJsonParser(input), Map.class);
+      Iterator<Map> i = mapper.readValues(jsonF.createParser(input), 
Map.class);
       while (i.hasNext()) {
         Map jsonE = i.next();
         String rack = "/" + jsonE.get("rack");
@@ -150,4 +166,21 @@ public class SLSUtils {
     }
     return nodeSet;
   }
+
+  public static Set<? extends String> generateNodes(int numNodes,
+      int numRacks){
+    Set<String> nodeSet = new HashSet<>();
+    if (numRacks < 1) {
+      numRacks = 1;
+    }
+
+    if (numRacks > numNodes) {
+      numRacks = numNodes;
+    }
+
+    for (int i = 0; i < numNodes; i++) {
+      nodeSet.add("/rack" + i % numRacks + "/node" + i);
+    }
+    return nodeSet;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to