Repository: aurora
Updated Branches:
  refs/heads/master de8b37549 -> cea43db9d


http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java 
b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
deleted file mode 100644
index 3c056c9..0000000
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ /dev/null
@@ -1,897 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReentrantLock;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-
-import org.apache.aurora.codec.ThriftBinaryCodec;
-import org.apache.aurora.codec.ThriftBinaryCodec.CodingException;
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Data;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.AssignedTask;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.InstanceTaskConfig;
-import org.apache.aurora.gen.JobConfiguration;
-import org.apache.aurora.gen.JobInstanceUpdateEvent;
-import org.apache.aurora.gen.JobUpdate;
-import org.apache.aurora.gen.JobUpdateAction;
-import org.apache.aurora.gen.JobUpdateEvent;
-import org.apache.aurora.gen.JobUpdateInstructions;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.gen.JobUpdateSettings;
-import org.apache.aurora.gen.JobUpdateStatus;
-import org.apache.aurora.gen.JobUpdateSummary;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.Range;
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.gen.ScheduleStatus;
-import org.apache.aurora.gen.ScheduledTask;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.gen.storage.DeduplicatedSnapshot;
-import org.apache.aurora.gen.storage.LogEntry;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.PruneJobUpdateHistory;
-import org.apache.aurora.gen.storage.RemoveJob;
-import org.apache.aurora.gen.storage.RemoveJobUpdates;
-import org.apache.aurora.gen.storage.RemoveQuota;
-import org.apache.aurora.gen.storage.RemoveTasks;
-import org.apache.aurora.gen.storage.SaveCronJob;
-import org.apache.aurora.gen.storage.SaveFrameworkId;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent;
-import org.apache.aurora.gen.storage.SaveJobUpdate;
-import org.apache.aurora.gen.storage.SaveJobUpdateEvent;
-import org.apache.aurora.gen.storage.SaveQuota;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.gen.storage.Snapshot;
-import org.apache.aurora.gen.storage.Transaction;
-import org.apache.aurora.gen.storage.storageConstants;
-import org.apache.aurora.scheduler.base.JobKeys;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.base.Tasks;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.log.Log;
-import org.apache.aurora.scheduler.log.Log.Entry;
-import org.apache.aurora.scheduler.log.Log.Position;
-import org.apache.aurora.scheduler.log.Log.Stream;
-import org.apache.aurora.scheduler.resources.ResourceTestUtil;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.SnapshotStore;
-import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
-import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobKey;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdate;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.apache.aurora.scheduler.storage.log.LogStorage.SchedulingService;
-import 
org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
-import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher;
-import 
org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.easymock.Capture;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.Resource.diskMb;
-import static org.apache.aurora.gen.Resource.numCpus;
-import static org.apache.aurora.gen.Resource.ramMb;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeConfig;
-import static org.apache.aurora.scheduler.base.TaskTestUtil.makeTask;
-import static org.easymock.EasyMock.capture;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.notNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class LogStorageTest extends EasyMockTest {
-
-  private static final Amount<Long, Time> SNAPSHOT_INTERVAL = Amount.of(1L, 
Time.MINUTES);
-  private static final IJobKey JOB_KEY = JobKeys.from("role", "env", "name");
-  private static final IJobUpdateKey UPDATE_ID =
-      IJobUpdateKey.build(new JobUpdateKey(JOB_KEY.newBuilder(), 
"testUpdateId"));
-  private static final long NOW = 42;
-
-  private LogStorage logStorage;
-  private Log log;
-  private SnapshotDeduplicator deduplicator;
-  private Stream stream;
-  private Position position;
-  private StreamMatcher streamMatcher;
-  private SchedulingService schedulingService;
-  private SnapshotStore<Snapshot> snapshotStore;
-  private StorageTestUtil storageUtil;
-  private EventSink eventSink;
-
-  @Before
-  public void setUp() {
-    log = createMock(Log.class);
-    deduplicator = createMock(SnapshotDeduplicator.class);
-
-    StreamManagerFactory streamManagerFactory = logStream -> {
-      HashFunction md5 = Hashing.md5();
-      return new StreamManagerImpl(
-          logStream,
-          new EntrySerializer.EntrySerializerImpl(Amount.of(1, Data.GB), md5),
-          md5,
-          deduplicator);
-    };
-    LogManager logManager = new LogManager(log, streamManagerFactory);
-
-    schedulingService = createMock(SchedulingService.class);
-    snapshotStore = createMock(new Clazz<SnapshotStore<Snapshot>>() { });
-    storageUtil = new StorageTestUtil(this);
-    eventSink = createMock(EventSink.class);
-
-    logStorage = new LogStorage(
-        logManager,
-        schedulingService,
-        snapshotStore,
-        SNAPSHOT_INTERVAL,
-        storageUtil.storage,
-        storageUtil.schedulerStore,
-        storageUtil.jobStore,
-        storageUtil.taskStore,
-        storageUtil.quotaStore,
-        storageUtil.attributeStore,
-        storageUtil.jobUpdateStore,
-        eventSink,
-        new ReentrantLock(),
-        TaskTestUtil.THRIFT_BACKFILL);
-
-    stream = createMock(Stream.class);
-    streamMatcher = LogOpMatcher.matcherFor(stream);
-    position = createMock(Position.class);
-
-    storageUtil.storage.prepare();
-  }
-
-  @Test
-  public void testStart() throws Exception {
-    // We should open the log and arrange for its clean shutdown.
-    expect(log.open()).andReturn(stream);
-
-    // Our start should recover the log and then forward to the underlying 
storage start of the
-    // supplied initialization logic.
-    AtomicBoolean initialized = new AtomicBoolean(false);
-    MutateWork.NoResult.Quiet initializationLogic = provider -> {
-      // Creating a mock and expecting apply(storeProvider) does not work here 
for whatever
-      // reason.
-      initialized.set(true);
-    };
-
-    Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = 
createCapture();
-    storageUtil.storage.write(capture(recoverAndInitializeWork));
-    expectLastCall().andAnswer(() -> {
-      
recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
-      return null;
-    });
-
-    Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
-    expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
-        () -> {
-          recoveryWork.getValue().apply(storageUtil.mutableStoreProvider);
-          return null;
-        });
-
-    Capture<MutateWork<Void, RuntimeException>> initializationWork = 
createCapture();
-    expect(storageUtil.storage.write(capture(initializationWork))).andAnswer(
-        () -> {
-          
initializationWork.getValue().apply(storageUtil.mutableStoreProvider);
-          return null;
-        });
-
-    // We should perform a snapshot when the snapshot thread runs.
-    Capture<Runnable> snapshotAction = createCapture();
-    schedulingService.doEvery(eq(SNAPSHOT_INTERVAL), capture(snapshotAction));
-    Snapshot snapshotContents = new Snapshot()
-        .setTimestamp(NOW)
-        .setTasks(ImmutableSet.of(makeTask("task_id", 
TaskTestUtil.JOB).newBuilder()));
-    expect(snapshotStore.createSnapshot()).andReturn(snapshotContents);
-    DeduplicatedSnapshot deduplicated =
-        new SnapshotDeduplicatorImpl().deduplicate(snapshotContents);
-    expect(deduplicator.deduplicate(snapshotContents)).andReturn(deduplicated);
-    streamMatcher.expectSnapshot(deduplicated).andReturn(position);
-    stream.truncateBefore(position);
-    Capture<MutateWork<Void, RuntimeException>> snapshotWork = createCapture();
-    expect(storageUtil.storage.write(capture(snapshotWork))).andAnswer(
-        () -> {
-          snapshotWork.getValue().apply(storageUtil.mutableStoreProvider);
-          return null;
-        }).anyTimes();
-
-    // Populate all LogEntry types.
-    buildReplayLogEntries();
-
-    control.replay();
-
-    logStorage.prepare();
-    logStorage.start(initializationLogic);
-    assertTrue(initialized.get());
-
-    // Run the snapshot thread.
-    snapshotAction.getValue().run();
-
-    // Assert all LogEntry types have handlers defined.
-    // Our current StreamManagerImpl.readFromBeginning() does not let some 
entries escape
-    // the decoding routine making handling them in replay unnecessary.
-    assertEquals(
-        Sets.complementOf(EnumSet.of(
-            LogEntry._Fields.FRAME,
-            LogEntry._Fields.DEDUPLICATED_SNAPSHOT,
-            LogEntry._Fields.DEFLATED_ENTRY)),
-        EnumSet.copyOf(logStorage.buildLogEntryReplayActions().keySet()));
-
-    // Assert all Transaction types have handlers defined.
-    assertEquals(
-        EnumSet.allOf(Op._Fields.class),
-        EnumSet.copyOf(logStorage.buildTransactionReplayActions().keySet()));
-  }
-
-  private void buildReplayLogEntries() throws Exception {
-    ImmutableSet.Builder<LogEntry> builder = ImmutableSet.builder();
-
-    builder.add(createTransaction(Op.saveFrameworkId(new 
SaveFrameworkId("bob"))));
-    storageUtil.schedulerStore.saveFrameworkId("bob");
-
-    JobConfiguration actualJob = new 
JobConfiguration().setTaskConfig(nonBackfilledConfig());
-    JobConfiguration expectedJob =
-        new JobConfiguration().setTaskConfig(makeConfig(JOB_KEY).newBuilder());
-    SaveCronJob cronJob = new SaveCronJob().setJobConfig(actualJob);
-    builder.add(createTransaction(Op.saveCronJob(cronJob)));
-    storageUtil.jobStore.saveAcceptedJob(IJobConfiguration.build(expectedJob));
-
-    RemoveJob removeJob = new RemoveJob(JOB_KEY.newBuilder());
-    builder.add(createTransaction(Op.removeJob(removeJob)));
-    storageUtil.jobStore.removeJob(JOB_KEY);
-
-    ScheduledTask actualTask = makeTask("id", JOB_KEY).newBuilder();
-    actualTask.getAssignedTask().setTask(nonBackfilledConfig());
-    IScheduledTask expectedTask = makeTask("id", JOB_KEY);
-    SaveTasks saveTasks = new SaveTasks(ImmutableSet.of(actualTask));
-    builder.add(createTransaction(Op.saveTasks(saveTasks)));
-    storageUtil.taskStore.saveTasks(ImmutableSet.of(expectedTask));
-
-    RemoveTasks removeTasks = new RemoveTasks(ImmutableSet.of("taskId1"));
-    builder.add(createTransaction(Op.removeTasks(removeTasks)));
-    storageUtil.taskStore.deleteTasks(removeTasks.getTaskIds());
-
-    ResourceAggregate nonBackfilled = new ResourceAggregate()
-        .setNumCpus(1.0)
-        .setRamMb(32)
-        .setDiskMb(64);
-    SaveQuota saveQuota = new SaveQuota(JOB_KEY.getRole(), nonBackfilled);
-    builder.add(createTransaction(Op.saveQuota(saveQuota)));
-    storageUtil.quotaStore.saveQuota(
-        saveQuota.getRole(),
-        IResourceAggregate.build(nonBackfilled.deepCopy()
-            .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), 
diskMb(64)))));
-
-    builder.add(createTransaction(Op.removeQuota(new 
RemoveQuota(JOB_KEY.getRole()))));
-    storageUtil.quotaStore.removeQuota(JOB_KEY.getRole());
-
-    // This entry lacks a slave ID, and should therefore be discarded.
-    SaveHostAttributes hostAttributes1 = new SaveHostAttributes(new 
HostAttributes()
-        .setHost("host1")
-        .setMode(MaintenanceMode.DRAINED));
-    builder.add(createTransaction(Op.saveHostAttributes(hostAttributes1)));
-
-    SaveHostAttributes hostAttributes2 = new SaveHostAttributes(new 
HostAttributes()
-        .setHost("host2")
-        .setSlaveId("slave2")
-        .setMode(MaintenanceMode.DRAINED));
-    builder.add(createTransaction(Op.saveHostAttributes(hostAttributes2)));
-    expect(storageUtil.attributeStore.saveHostAttributes(
-        
IHostAttributes.build(hostAttributes2.getHostAttributes()))).andReturn(true);
-
-    JobUpdate actualUpdate = new JobUpdate()
-        .setSummary(new JobUpdateSummary().setKey(UPDATE_ID.newBuilder()))
-        .setInstructions(new JobUpdateInstructions()
-            .setInitialState(
-                ImmutableSet.of(new 
InstanceTaskConfig().setTask(nonBackfilledConfig())))
-            .setDesiredState(new 
InstanceTaskConfig().setTask(nonBackfilledConfig())));
-    JobUpdate expectedUpdate = actualUpdate.deepCopy();
-    
expectedUpdate.getInstructions().getDesiredState().setTask(makeConfig(JOB_KEY).newBuilder());
-    expectedUpdate.getInstructions().getInitialState()
-        .forEach(e -> e.setTask(makeConfig(JOB_KEY).newBuilder()));
-    SaveJobUpdate saveUpdate = new SaveJobUpdate().setJobUpdate(actualUpdate);
-    builder.add(createTransaction(Op.saveJobUpdate(saveUpdate)));
-    storageUtil.jobUpdateStore.saveJobUpdate(IJobUpdate.build(expectedUpdate));
-
-    SaveJobUpdateEvent saveUpdateEvent =
-        new SaveJobUpdateEvent(new JobUpdateEvent(), UPDATE_ID.newBuilder());
-    builder.add(createTransaction(Op.saveJobUpdateEvent(saveUpdateEvent)));
-    storageUtil.jobUpdateStore.saveJobUpdateEvent(
-        UPDATE_ID,
-        IJobUpdateEvent.build(saveUpdateEvent.getEvent()));
-
-    SaveJobInstanceUpdateEvent saveInstanceEvent = new 
SaveJobInstanceUpdateEvent(
-        new JobInstanceUpdateEvent(),
-        UPDATE_ID.newBuilder());
-    
builder.add(createTransaction(Op.saveJobInstanceUpdateEvent(saveInstanceEvent)));
-    storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(
-        UPDATE_ID,
-        IJobInstanceUpdateEvent.build(saveInstanceEvent.getEvent()));
-
-    builder.add(createTransaction(Op.pruneJobUpdateHistory(new 
PruneJobUpdateHistory(5, 10L))));
-    // No expectation - this op is ignored.
-
-    builder.add(createTransaction(Op.removeJobUpdate(
-        new 
RemoveJobUpdates().setKeys(ImmutableSet.of(UPDATE_ID.newBuilder())))));
-    storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(UPDATE_ID));
-
-    // NOOP LogEntry
-    builder.add(LogEntry.noop(true));
-
-    // Snapshot LogEntry
-    Snapshot snapshot = new Snapshot();
-    builder.add(LogEntry.snapshot(snapshot));
-    snapshotStore.applySnapshot(snapshot);
-
-    ImmutableSet.Builder<Entry> entryBuilder = ImmutableSet.builder();
-    for (LogEntry logEntry : builder.build()) {
-      Entry entry = createMock(Entry.class);
-      entryBuilder.add(entry);
-      
expect(entry.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(logEntry));
-    }
-
-    expect(stream.readAll()).andReturn(entryBuilder.build().iterator());
-  }
-
-  private TaskConfig nonBackfilledConfig() {
-    // When more fields have to be backfilled
-    // modify this method.
-    return makeConfig(JOB_KEY).newBuilder();
-  }
-
-  abstract class AbstractStorageFixture {
-    private final AtomicBoolean runCalled = new AtomicBoolean(false);
-
-    AbstractStorageFixture() {
-      // Prevent otherwise silent noop tests that forget to call run().
-      addTearDown(new TearDown() {
-        @Override
-        public void tearDown() {
-          assertTrue(runCalled.get());
-        }
-      });
-    }
-
-    void run() throws Exception {
-      runCalled.set(true);
-
-      // Expect basic start operations.
-
-      // Open the log stream.
-      expect(log.open()).andReturn(stream);
-
-      // Replay the log and perform and supplied initializationWork.
-      // Simulate NOOP initialization work
-      // Creating a mock and expecting apply(storeProvider) does not work here 
for whatever
-      // reason.
-      MutateWork.NoResult.Quiet initializationLogic = storeProvider -> {
-        // No-op.
-      };
-
-      Capture<MutateWork.NoResult.Quiet> recoverAndInitializeWork = 
createCapture();
-      storageUtil.storage.write(capture(recoverAndInitializeWork));
-      expectLastCall().andAnswer(() -> {
-        
recoverAndInitializeWork.getValue().apply(storageUtil.mutableStoreProvider);
-        return null;
-      });
-
-      expect(stream.readAll()).andReturn(Collections.emptyIterator());
-      Capture<MutateWork<Void, RuntimeException>> recoveryWork = 
createCapture();
-      expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
-          () -> {
-            recoveryWork.getValue().apply(storageUtil.mutableStoreProvider);
-            return null;
-          });
-
-      // Schedule snapshots.
-      schedulingService.doEvery(eq(SNAPSHOT_INTERVAL), 
notNull(Runnable.class));
-
-      // Setup custom test expectations.
-      setupExpectations();
-
-      control.replay();
-
-      // Start the system.
-      logStorage.prepare();
-      logStorage.start(initializationLogic);
-
-      // Exercise the system.
-      runTest();
-    }
-
-    protected void setupExpectations() throws Exception {
-      // Default to no expectations.
-    }
-
-    protected abstract void runTest();
-  }
-
-  abstract class AbstractMutationFixture extends AbstractStorageFixture {
-    @Override
-    protected void runTest() {
-      logStorage.write((Quiet) AbstractMutationFixture.this::performMutations);
-    }
-
-    protected abstract void performMutations(MutableStoreProvider 
storeProvider);
-  }
-
-  @Test
-  public void testSaveFrameworkId() throws Exception {
-    String frameworkId = "bob";
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws CodingException {
-        storageUtil.expectWrite();
-        storageUtil.schedulerStore.saveFrameworkId(frameworkId);
-        streamMatcher.expectTransaction(Op.saveFrameworkId(new 
SaveFrameworkId(frameworkId)))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getSchedulerStore().saveFrameworkId(frameworkId);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveAcceptedJob() throws Exception {
-    IJobConfiguration jobConfig =
-        IJobConfiguration.build(new 
JobConfiguration().setKey(JOB_KEY.newBuilder()));
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.jobStore.saveAcceptedJob(jobConfig);
-        streamMatcher.expectTransaction(
-            Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder())))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getCronJobStore().saveAcceptedJob(jobConfig);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testRemoveJob() throws Exception {
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.jobStore.removeJob(JOB_KEY);
-        streamMatcher.expectTransaction(
-            Op.removeJob(new RemoveJob().setJobKey(JOB_KEY.newBuilder())))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getCronJobStore().removeJob(JOB_KEY);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveTasks() throws Exception {
-    Set<IScheduledTask> tasks = ImmutableSet.of(task("a", 
ScheduleStatus.INIT));
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.taskStore.saveTasks(tasks);
-        streamMatcher.expectTransaction(
-            Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(tasks))))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(tasks);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testMutateTasks() throws Exception {
-    String taskId = "fred";
-    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
-    Optional<IScheduledTask> mutated = Optional.of(task("a", 
ScheduleStatus.STARTING));
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        expect(storageUtil.taskStore.mutateTask(taskId, 
mutation)).andReturn(mutated);
-        streamMatcher.expectTransaction(
-            Op.saveTasks(new 
SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))))
-            .andReturn(null);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        assertEquals(mutated, 
storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
-      }
-    }.run();
-  }
-
-  @Test
-  public void testNestedTransactions() throws Exception {
-    String taskId = "fred";
-    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
-    Optional<IScheduledTask> mutated = Optional.of(task("a", 
ScheduleStatus.STARTING));
-    ImmutableSet<String> tasksToRemove = ImmutableSet.of("b");
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        expect(storageUtil.taskStore.mutateTask(taskId, 
mutation)).andReturn(mutated);
-
-        storageUtil.taskStore.deleteTasks(tasksToRemove);
-
-        streamMatcher.expectTransaction(
-            Op.saveTasks(new 
SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))),
-            Op.removeTasks(new RemoveTasks(tasksToRemove)))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        assertEquals(mutated, 
storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
-
-        logStorage.write((NoResult.Quiet)
-            innerProvider -> 
innerProvider.getUnsafeTaskStore().deleteTasks(tasksToRemove));
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveAndMutateTasks() throws Exception {
-    String taskId = "fred";
-    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
-    Set<IScheduledTask> saved = ImmutableSet.of(task("a", 
ScheduleStatus.INIT));
-    Optional<IScheduledTask> mutated = Optional.of(task("a", 
ScheduleStatus.PENDING));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.taskStore.saveTasks(saved);
-
-        // Nested transaction with result.
-        expect(storageUtil.taskStore.mutateTask(taskId, 
mutation)).andReturn(mutated);
-
-        // Resulting stream operation.
-        streamMatcher.expectTransaction(Op.saveTasks(
-            new SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))))
-            .andReturn(null);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(saved);
-        assertEquals(mutated, 
storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveAndMutateTasksNoCoalesceUniqueIds() throws Exception {
-    String taskId = "fred";
-    Function<IScheduledTask, IScheduledTask> mutation = Functions.identity();
-    Set<IScheduledTask> saved = ImmutableSet.of(task("b", 
ScheduleStatus.INIT));
-    Optional<IScheduledTask> mutated = Optional.of(task("a", 
ScheduleStatus.PENDING));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.taskStore.saveTasks(saved);
-
-        // Nested transaction with result.
-        expect(storageUtil.taskStore.mutateTask(taskId, 
mutation)).andReturn(mutated);
-
-        // Resulting stream operation.
-        streamMatcher.expectTransaction(
-            Op.saveTasks(new SaveTasks(
-                ImmutableSet.<ScheduledTask>builder()
-                    .addAll(IScheduledTask.toBuildersList(saved))
-                    .add(mutated.get().newBuilder())
-                    .build())))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().saveTasks(saved);
-        assertEquals(mutated, 
storeProvider.getUnsafeTaskStore().mutateTask(taskId, mutation));
-      }
-    }.run();
-  }
-
-  @Test
-  public void testRemoveTasksQuery() throws Exception {
-    IScheduledTask task = task("a", ScheduleStatus.FINISHED);
-    Set<String> taskIds = Tasks.ids(task);
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.taskStore.deleteTasks(taskIds);
-        streamMatcher.expectTransaction(Op.removeTasks(new 
RemoveTasks(taskIds)))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testRemoveTasksIds() throws Exception {
-    Set<String> taskIds = ImmutableSet.of("42");
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.taskStore.deleteTasks(taskIds);
-        streamMatcher.expectTransaction(Op.removeTasks(new 
RemoveTasks(taskIds)))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getUnsafeTaskStore().deleteTasks(taskIds);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveQuota() throws Exception {
-    String role = "role";
-    IResourceAggregate quota = ResourceTestUtil.aggregate(1.0, 128L, 1024L);
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.quotaStore.saveQuota(role, quota);
-        streamMatcher.expectTransaction(Op.saveQuota(new SaveQuota(role, 
quota.newBuilder())))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getQuotaStore().saveQuota(role, quota);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testRemoveQuota() throws Exception {
-    String role = "role";
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.quotaStore.removeQuota(role);
-        streamMatcher.expectTransaction(Op.removeQuota(new 
RemoveQuota(role))).andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getQuotaStore().removeQuota(role);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveHostAttributes() throws Exception {
-    String host = "hostname";
-    Set<Attribute> attributes =
-        ImmutableSet.of(new 
Attribute().setName("attr").setValues(ImmutableSet.of("value")));
-    Optional<IHostAttributes> hostAttributes = Optional.of(
-        IHostAttributes.build(new HostAttributes()
-            .setHost(host)
-            .setAttributes(attributes)));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        expect(storageUtil.attributeStore.getHostAttributes(host))
-            .andReturn(Optional.absent());
-
-        
expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
-
-        
expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get())).andReturn(true);
-        eventSink.post(new 
PubsubEvent.HostAttributesChanged(hostAttributes.get()));
-        streamMatcher.expectTransaction(
-            Op.saveHostAttributes(new 
SaveHostAttributes(hostAttributes.get().newBuilder())))
-            .andReturn(position);
-
-        
expect(storageUtil.attributeStore.saveHostAttributes(hostAttributes.get()))
-            .andReturn(false);
-
-        
expect(storageUtil.attributeStore.getHostAttributes(host)).andReturn(hostAttributes);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        AttributeStore.Mutable store = storeProvider.getAttributeStore();
-        assertEquals(Optional.absent(), store.getHostAttributes(host));
-
-        assertTrue(store.saveHostAttributes(hostAttributes.get()));
-
-        assertEquals(hostAttributes, store.getHostAttributes(host));
-
-        assertFalse(store.saveHostAttributes(hostAttributes.get()));
-
-        assertEquals(hostAttributes, store.getHostAttributes(host));
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveUpdate() throws Exception {
-    IJobUpdate update = IJobUpdate.build(new JobUpdate()
-        .setSummary(new JobUpdateSummary()
-            .setKey(UPDATE_ID.newBuilder())
-            .setUser("user"))
-        .setInstructions(new JobUpdateInstructions()
-            .setDesiredState(new InstanceTaskConfig()
-                .setTask(new TaskConfig())
-                .setInstances(ImmutableSet.of(new Range(0, 3))))
-            .setInitialState(ImmutableSet.of(new InstanceTaskConfig()
-                .setTask(new TaskConfig())
-                .setInstances(ImmutableSet.of(new Range(0, 3)))))
-            .setSettings(new JobUpdateSettings())));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.jobUpdateStore.saveJobUpdate(update);
-        streamMatcher.expectTransaction(
-            Op.saveJobUpdate(new 
SaveJobUpdate().setJobUpdate(update.newBuilder())))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getJobUpdateStore().saveJobUpdate(update);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveJobUpdateEvent() throws Exception {
-    IJobUpdateEvent event = IJobUpdateEvent.build(new JobUpdateEvent()
-        .setStatus(JobUpdateStatus.ROLLING_BACK)
-        .setTimestampMs(12345L));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.jobUpdateStore.saveJobUpdateEvent(UPDATE_ID, event);
-        streamMatcher.expectTransaction(Op.saveJobUpdateEvent(new 
SaveJobUpdateEvent(
-            event.newBuilder(),
-            UPDATE_ID.newBuilder()))).andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        storeProvider.getJobUpdateStore().saveJobUpdateEvent(UPDATE_ID, event);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testSaveJobInstanceUpdateEvent() throws Exception {
-    IJobInstanceUpdateEvent event = IJobInstanceUpdateEvent.build(new 
JobInstanceUpdateEvent()
-        .setAction(JobUpdateAction.INSTANCE_ROLLING_BACK)
-        .setTimestampMs(12345L)
-        .setInstanceId(0));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.jobUpdateStore.saveJobInstanceUpdateEvent(UPDATE_ID, 
event);
-        streamMatcher.expectTransaction(Op.saveJobInstanceUpdateEvent(
-            new SaveJobInstanceUpdateEvent(
-                event.newBuilder(),
-                UPDATE_ID.newBuilder())))
-            .andReturn(position);
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        
storeProvider.getJobUpdateStore().saveJobInstanceUpdateEvent(UPDATE_ID, event);
-      }
-    }.run();
-  }
-
-  @Test
-  public void testRemoveJobUpdates() throws Exception {
-    IJobUpdateKey key = IJobUpdateKey.build(new JobUpdateKey()
-        .setJob(JOB_KEY.newBuilder())
-        .setId("update-id"));
-
-    new AbstractMutationFixture() {
-      @Override
-      protected void setupExpectations() throws Exception {
-        storageUtil.expectWrite();
-        storageUtil.jobUpdateStore.removeJobUpdates(ImmutableSet.of(key));
-
-        // No log transaction is generated since this version is currently in 
'read-only'
-        // compatibility mode for this operation type.
-      }
-
-      @Override
-      protected void performMutations(MutableStoreProvider storeProvider) {
-        
storeProvider.getJobUpdateStore().removeJobUpdates(ImmutableSet.of(key));
-      }
-    }.run();
-  }
-
-  private LogEntry createTransaction(Op... ops) {
-    return LogEntry.transaction(
-        new Transaction(ImmutableList.copyOf(ops), 
storageConstants.CURRENT_SCHEMA_VERSION));
-  }
-
-  private static IScheduledTask task(String id, ScheduleStatus status) {
-    return IScheduledTask.build(new ScheduledTask()
-        .setStatus(status)
-        .setAssignedTask(new AssignedTask()
-            .setTaskId(id)
-            .setTask(new TaskConfig())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
 
b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
index f43a836..eb966d7 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
@@ -42,6 +42,7 @@ import org.apache.aurora.scheduler.config.types.TimeAmount;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.log.Log;
 import org.apache.aurora.scheduler.resources.ResourceTestUtil;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.SnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
@@ -61,6 +62,7 @@ public class NonVolatileStorageTest extends TearDownTestCase {
   private FakeLog log;
   private Runnable teardown = () -> { };
   private NonVolatileStorage storage;
+  private DistributedSnapshotStore snapshotStore;
 
   @Before
   public void setUp() {
@@ -95,6 +97,7 @@ public class NonVolatileStorageTest extends TearDownTestCase {
         }
     );
     storage = injector.getInstance(NonVolatileStorage.class);
+    snapshotStore = injector.getInstance(DistributedSnapshotStore.class);
     storage.prepare();
     storage.start(w -> { });
 
@@ -147,7 +150,7 @@ public class NonVolatileStorageTest extends 
TearDownTestCase {
           });
 
           // Result should survive another reset.
-          storage.snapshot();
+          snapshotStore.snapshot();
           resetStorage();
           storage.read(stores -> {
             transaction.getSecond().accept(stores);

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
 
b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
index a1944c4..5634f92 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImplIT.java
@@ -55,6 +55,7 @@ import org.apache.aurora.scheduler.base.JobKeys;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.resources.ResourceBag;
 import org.apache.aurora.scheduler.storage.Storage;
+import org.apache.aurora.scheduler.storage.durability.ThriftBackfill;
 import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
 import org.apache.aurora.scheduler.storage.entities.IJobConfiguration;
 import org.apache.aurora.scheduler.storage.entities.IJobKey;

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/ThriftBackfillTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/storage/log/ThriftBackfillTest.java 
b/src/test/java/org/apache/aurora/scheduler/storage/log/ThriftBackfillTest.java
deleted file mode 100644
index 59c2c5b..0000000
--- 
a/src/test/java/org/apache/aurora/scheduler/storage/log/ThriftBackfillTest.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.ResourceAggregate;
-import org.apache.aurora.gen.TaskConfig;
-import org.apache.aurora.scheduler.TierManager;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
-import org.apache.aurora.scheduler.storage.entities.ITaskConfig;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.apache.aurora.gen.Resource.diskMb;
-import static org.apache.aurora.gen.Resource.namedPort;
-import static org.apache.aurora.gen.Resource.numCpus;
-import static org.apache.aurora.gen.Resource.ramMb;
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-public class ThriftBackfillTest extends EasyMockTest {
-
-  private ThriftBackfill thriftBackfill;
-  private TierManager tierManager;
-
-  @Before
-  public void setUp() {
-    tierManager = createMock(TierManager.class);
-    thriftBackfill = new ThriftBackfill(tierManager);
-  }
-
-  @Test
-  public void testFieldsToSetNoPorts() {
-    TaskConfig config = new TaskConfig()
-        .setResources(ImmutableSet.of(
-            numCpus(1.0),
-            ramMb(32),
-            diskMb(64)))
-        .setProduction(false)
-        .setTier("tierName");
-    TaskConfig expected = config.deepCopy()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
-    
expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.DEV_TIER);
-
-    control.replay();
-
-    assertEquals(
-        expected,
-        thriftBackfill.backfillTask(config));
-  }
-
-  @Test
-  public void testResourceAggregateFieldsToSet() {
-    control.replay();
-
-    ResourceAggregate aggregate = new ResourceAggregate()
-        .setNumCpus(1.0)
-        .setRamMb(32)
-        .setDiskMb(64);
-
-    IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64))));
-
-    assertEquals(expected, 
ThriftBackfill.backfillResourceAggregate(aggregate));
-  }
-
-  @Test
-  public void testResourceAggregateSetToFields() {
-    control.replay();
-
-    ResourceAggregate aggregate = new ResourceAggregate()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
-    IResourceAggregate expected = IResourceAggregate.build(aggregate.deepCopy()
-        .setNumCpus(1.0)
-        .setRamMb(32)
-        .setDiskMb(64));
-
-    assertEquals(expected, 
ThriftBackfill.backfillResourceAggregate(aggregate));
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testResourceAggregateTooManyResources() {
-    control.replay();
-
-    ResourceAggregate aggregate = new ResourceAggregate()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64), 
numCpus(2.0)));
-    ThriftBackfill.backfillResourceAggregate(aggregate);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testResourceAggregateInvalidResources() {
-    control.replay();
-
-    ResourceAggregate aggregate = new ResourceAggregate()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), 
namedPort("http")));
-    ThriftBackfill.backfillResourceAggregate(aggregate);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testResourceAggregateMissingResources() {
-    control.replay();
-
-    ResourceAggregate aggregate = new ResourceAggregate()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32)));
-    ThriftBackfill.backfillResourceAggregate(aggregate);
-  }
-
-  @Test
-  public void testBackfillTierProduction() {
-    TaskConfig config = new TaskConfig()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
-        .setProduction(true)
-        .setTier("tierName");
-    TaskConfig expected = config.deepCopy()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
-    
expect(tierManager.getTier(ITaskConfig.build(expected))).andReturn(TaskTestUtil.PREFERRED_TIER);
-
-    control.replay();
-
-    assertEquals(
-        expected,
-        thriftBackfill.backfillTask(config));
-  }
-
-  @Test
-  public void testBackfillTierNotProduction() {
-    TaskConfig config = new TaskConfig()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)))
-        .setProduction(true)
-        .setTier("tierName");
-    TaskConfig configWithBackfilledResources = config.deepCopy()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
-    
expect(tierManager.getTier(ITaskConfig.build(configWithBackfilledResources)))
-        .andReturn(TaskTestUtil.DEV_TIER);
-
-    control.replay();
-
-    TaskConfig expected = configWithBackfilledResources.deepCopy()
-        .setProduction(false);
-
-    assertEquals(
-        expected,
-        thriftBackfill.backfillTask(config));
-  }
-
-  @Test
-  public void testBackfillTierSetsTierToPreemptible() {
-    TaskConfig config = new TaskConfig()
-            .setResources(ImmutableSet.of(
-                    numCpus(1.0),
-                    ramMb(32),
-                    diskMb(64)));
-    TaskConfig configWithBackfilledResources = config.deepCopy()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
-    expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
-
-    control.replay();
-
-    TaskConfig expected = 
configWithBackfilledResources.deepCopy().setTier("preemptible");
-
-    assertEquals(
-        expected,
-        thriftBackfill.backfillTask(config));
-  }
-
-  @Test
-  public void testBackfillTierSetsTierToPreferred() {
-    TaskConfig config = new TaskConfig()
-        .setResources(ImmutableSet.of(
-            numCpus(1.0),
-            ramMb(32),
-            diskMb(64)))
-        .setProduction(true);
-    TaskConfig configWithBackfilledResources = config.deepCopy()
-        .setResources(ImmutableSet.of(numCpus(1.0), ramMb(32), diskMb(64)));
-
-    expect(tierManager.getTiers()).andReturn(TaskTestUtil.tierInfos());
-
-    control.replay();
-
-    TaskConfig expected = 
configWithBackfilledResources.deepCopy().setTier("preferred");
-
-    assertEquals(
-        expected,
-        thriftBackfill.backfillTask(config));
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testBackfillTierBadTierConfiguration() {
-    TaskConfig config = new TaskConfig()
-            .setResources(ImmutableSet.of(
-                    numCpus(1.0),
-                    ramMb(32),
-                    diskMb(64)));
-
-    expect(tierManager.getTiers()).andReturn(ImmutableMap.of());
-
-    control.replay();
-
-    thriftBackfill.backfillTask(config);
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
 
b/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
deleted file mode 100644
index 8a99b36..0000000
--- 
a/src/test/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorageTest.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import java.util.Set;
-
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.Attribute;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.gen.JobUpdateKey;
-import org.apache.aurora.gen.MaintenanceMode;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveTasks;
-import org.apache.aurora.scheduler.base.TaskTestUtil;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent;
-import org.apache.aurora.scheduler.storage.AttributeStore;
-import org.apache.aurora.scheduler.storage.CronJobStore;
-import org.apache.aurora.scheduler.storage.JobUpdateStore;
-import org.apache.aurora.scheduler.storage.QuotaStore;
-import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.TaskStore;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey;
-import org.apache.aurora.scheduler.storage.entities.IScheduledTask;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.LoggerFactory;
-
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class WriteAheadStorageTest extends EasyMockTest {
-
-  private LogStorage.TransactionManager transactionManager;
-  private TaskStore.Mutable taskStore;
-  private AttributeStore.Mutable attributeStore;
-  private JobUpdateStore.Mutable jobUpdateStore;
-  private EventSink eventSink;
-  private WriteAheadStorage storage;
-
-  @Before
-  public void setUp() {
-    transactionManager = createMock(LogStorage.TransactionManager.class);
-    taskStore = createMock(TaskStore.Mutable.class);
-    attributeStore = createMock(AttributeStore.Mutable.class);
-    jobUpdateStore = createMock(JobUpdateStore.Mutable.class);
-    eventSink = createMock(EventSink.class);
-
-    storage = new WriteAheadStorage(
-        transactionManager,
-        createMock(SchedulerStore.Mutable.class),
-        createMock(CronJobStore.Mutable.class),
-        taskStore,
-        createMock(QuotaStore.Mutable.class),
-        attributeStore,
-        jobUpdateStore,
-        LoggerFactory.getLogger(WriteAheadStorageTest.class),
-        eventSink);
-  }
-
-  private void expectOp(Op op) {
-    expect(transactionManager.hasActiveTransaction()).andReturn(true);
-    transactionManager.log(op);
-  }
-
-  @Test
-  public void testRemoveUpdates() {
-    Set<IJobUpdateKey> removed = ImmutableSet.of(
-        IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), 
"a")),
-        IJobUpdateKey.build(new JobUpdateKey(TaskTestUtil.JOB.newBuilder(), 
"b")));
-    jobUpdateStore.removeJobUpdates(removed);
-    // No operation is written since this Op is in read-only compatibility 
mode.
-
-    control.replay();
-
-    storage.removeJobUpdates(removed);
-  }
-
-  @Test
-  public void testMutate() {
-    String taskId = "a";
-    Function<IScheduledTask, IScheduledTask> mutator =
-        createMock(new Clazz<Function<IScheduledTask, IScheduledTask>>() { });
-    Optional<IScheduledTask> mutated = 
Optional.of(TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB));
-
-    expect(taskStore.mutateTask(taskId, mutator)).andReturn(mutated);
-    expectOp(Op.saveTasks(new 
SaveTasks(ImmutableSet.of(mutated.get().newBuilder()))));
-
-    control.replay();
-
-    assertEquals(mutated, storage.mutateTask(taskId, mutator));
-  }
-
-  @Test
-  public void testSaveHostAttributes() {
-    IHostAttributes attributes = IHostAttributes.build(
-        new HostAttributes()
-            .setHost("a")
-            .setMode(MaintenanceMode.DRAINING)
-            .setAttributes(ImmutableSet.of(
-                new Attribute().setName("b").setValues(ImmutableSet.of("1", 
"2")))));
-
-    expect(attributeStore.saveHostAttributes(attributes)).andReturn(true);
-    expectOp(Op.saveHostAttributes(
-        new SaveHostAttributes().setHostAttributes(attributes.newBuilder())));
-    eventSink.post(new PubsubEvent.HostAttributesChanged(attributes));
-
-    expect(attributeStore.saveHostAttributes(attributes)).andReturn(false);
-
-    control.replay();
-
-    assertTrue(storage.saveHostAttributes(attributes));
-
-    assertFalse(storage.saveHostAttributes(attributes));
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteAllTasks() {
-    control.replay();
-    storage.deleteAllTasks();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteHostAttributes() {
-    control.replay();
-    storage.deleteHostAttributes();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteJobs() {
-    control.replay();
-    storage.deleteJobs();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteQuotas() {
-    control.replay();
-    storage.deleteQuotas();
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testDeleteAllUpdatesAndEvents() {
-    control.replay();
-    storage.deleteAllUpdates();
-  }
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
 
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
index 42a79a6..8837384 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterfaceTest.java
@@ -84,6 +84,7 @@ import 
org.apache.aurora.scheduler.state.MaintenanceController;
 import org.apache.aurora.scheduler.state.StateChangeResult;
 import org.apache.aurora.scheduler.state.StateManager;
 import org.apache.aurora.scheduler.state.UUIDGenerator;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
 import org.apache.aurora.scheduler.storage.backup.StorageBackup;
@@ -176,6 +177,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
       ImmutableSet.of(new Metadata("k1", "v1"), new Metadata("k2", "v2"));
 
   private StorageTestUtil storageUtil;
+  private DistributedSnapshotStore snapshotStore;
   private StorageBackup backup;
   private Recovery recovery;
   private MaintenanceController maintenance;
@@ -194,6 +196,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
   public void setUp() throws Exception {
     storageUtil = new StorageTestUtil(this);
     storageUtil.expectOperations();
+    snapshotStore = createMock(DistributedSnapshotStore.class);
     backup = createMock(StorageBackup.class);
     recovery = createMock(Recovery.class);
     maintenance = createMock(MaintenanceController.class);
@@ -212,6 +215,7 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
             TaskTestUtil.CONFIGURATION_MANAGER,
             THRESHOLDS,
             storageUtil.storage,
+            snapshotStore,
             backup,
             recovery,
             cronJobManager,
@@ -1105,10 +1109,10 @@ public class SchedulerThriftInterfaceTest extends 
EasyMockTest {
 
   @Test
   public void testSnapshot() throws Exception {
-    storageUtil.storage.snapshot();
+    snapshotStore.snapshot();
     expectLastCall();
 
-    storageUtil.storage.snapshot();
+    snapshotStore.snapshot();
     expectLastCall().andThrow(new StorageException("mock error!"));
 
     control.replay();

http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java 
b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index b2c371c..bb0fd89 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -62,6 +62,7 @@ import org.apache.aurora.scheduler.quota.QuotaModule;
 import org.apache.aurora.scheduler.resources.ResourceTestUtil;
 import org.apache.aurora.scheduler.resources.ResourceType;
 import org.apache.aurora.scheduler.stats.StatsModule;
+import org.apache.aurora.scheduler.storage.DistributedSnapshotStore;
 import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
 import org.apache.aurora.scheduler.storage.backup.Recovery;
@@ -135,6 +136,7 @@ public class ThriftIT extends EasyMockTest {
             bind(FrameworkInfoFactoryImpl.class).in(Singleton.class);
             bindMock(Recovery.class);
             bindMock(StorageBackup.class);
+            bindMock(DistributedSnapshotStore.class);
             bind(IServerInfo.class).toInstance(SERVER_INFO);
           }
 

Reply via email to