Repository: aurora
Updated Branches:
  refs/heads/master 773d2d6ab -> 9b9b2ee7b


Add a test for storage durability

Reviewed at https://reviews.apache.org/r/63670/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/9b9b2ee7
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/9b9b2ee7
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/9b9b2ee7

Branch: refs/heads/master
Commit: 9b9b2ee7b52e5299494828a0fa7fc793c9364de7
Parents: 773d2d6
Author: Bill Farner <wfar...@apache.org>
Authored: Thu Nov 9 09:26:50 2017 -0800
Committer: Bill Farner <wfar...@apache.org>
Committed: Thu Nov 9 09:26:50 2017 -0800

----------------------------------------------------------------------
 .../org/apache/aurora/scheduler/log/Log.java    |   2 +-
 .../aurora/scheduler/log/mesos/MesosLog.java    |   6 -
 .../aurora/scheduler/app/SchedulerIT.java       |  20 +--
 .../scheduler/log/mesos/MesosLogTest.java       |  14 --
 .../aurora/scheduler/storage/log/FakeLog.java   |  68 ++++++++
 .../storage/log/NonVolatileStorageTest.java     | 158 +++++++++++++++++++
 6 files changed, 228 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/9b9b2ee7/src/main/java/org/apache/aurora/scheduler/log/Log.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/log/Log.java 
b/src/main/java/org/apache/aurora/scheduler/log/Log.java
index dc77eb4..2a76144 100644
--- a/src/main/java/org/apache/aurora/scheduler/log/Log.java
+++ b/src/main/java/org/apache/aurora/scheduler/log/Log.java
@@ -28,7 +28,7 @@ public interface Log {
   /**
    * An opaque ordered handle to a log entry's position in the log stream.
    */
-  interface Position extends Comparable<Position> {
+  interface Position {
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/aurora/blob/9b9b2ee7/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java 
b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java
index 21855e1..bcdd459 100644
--- a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java
+++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java
@@ -376,12 +376,6 @@ public class MesosLog implements 
org.apache.aurora.scheduler.log.Log {
       Log.Position unwrap() {
         return underlying;
       }
-
-      @Override
-      public int compareTo(Position o) {
-        Preconditions.checkArgument(o instanceof LogPosition);
-        return underlying.compareTo(((LogPosition) o).underlying);
-      }
     }
 
     private static class LogEntry implements 
org.apache.aurora.scheduler.log.Log.Entry {

http://git-wip-us.apache.org/repos/asf/aurora/blob/9b9b2ee7/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java 
b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 67a0d5a..4929ecd 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -22,7 +22,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Optional;
@@ -272,23 +271,6 @@ public class SchedulerIT extends BaseZooKeeperTest {
     }).get();
   }
 
-  private final AtomicInteger curPosition = new AtomicInteger();
-  private static class IntPosition implements Position {
-    private final int pos;
-
-    IntPosition(int pos) {
-      this.pos = pos;
-    }
-
-    @Override
-    public int compareTo(Position position) {
-      return pos - ((IntPosition) position).pos;
-    }
-  }
-  private IntPosition nextPosition() {
-    return new IntPosition(curPosition.incrementAndGet());
-  }
-
   private Iterable<Entry> toEntries(LogEntry... entries) {
     return Iterables.transform(Arrays.asList(entries),
         entry -> {
@@ -337,7 +319,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
     expect(log.open()).andReturn(logStream);
     
expect(logStream.readAll()).andReturn(recoveredEntries.iterator()).anyTimes();
     streamMatcher.expectTransaction(Op.saveFrameworkId(new 
SaveFrameworkId(FRAMEWORK_ID)))
-        .andReturn(nextPosition());
+        .andReturn(new Position() { });
 
     CountDownLatch driverStarted = new CountDownLatch(1);
     expect(driver.start()).andAnswer(() -> {

http://git-wip-us.apache.org/repos/asf/aurora/blob/9b9b2ee7/src/test/java/org/apache/aurora/scheduler/log/mesos/MesosLogTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/log/mesos/MesosLogTest.java 
b/src/test/java/org/apache/aurora/scheduler/log/mesos/MesosLogTest.java
index f142f54..3e6d0d3 100644
--- a/src/test/java/org/apache/aurora/scheduler/log/mesos/MesosLogTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/log/mesos/MesosLogTest.java
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeoutException;
 
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSortedSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
@@ -274,17 +273,4 @@ public class MesosLogTest extends EasyMockTest {
     // So close!  The implementation requires that hasNext() is called first.
     logStream.readAll().next();
   }
-
-  @Test
-  public void testSortOrder() throws Exception {
-    control.replay();
-
-    LogPosition a = new LogPosition(makePosition(5));
-    LogPosition b = new LogPosition(makePosition(10));
-    LogPosition c = new LogPosition(makePosition(3));
-    assertEquals(
-        ImmutableList.of(c, a, b),
-        ImmutableList.copyOf(ImmutableSortedSet.of(a, b, c))
-    );
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/9b9b2ee7/src/test/java/org/apache/aurora/scheduler/storage/log/FakeLog.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/FakeLog.java 
b/src/test/java/org/apache/aurora/scheduler/storage/log/FakeLog.java
new file mode 100644
index 0000000..b62d26e
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/FakeLog.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NavigableMap;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+
+import org.apache.aurora.scheduler.log.Log;
+
+public class FakeLog implements Log {
+  private final NavigableMap<Long, byte[]> data = Maps.newTreeMap();
+
+  @Override
+  public Stream open() throws IOException {
+    return new FakeStream();
+  }
+
+  private static class LongPosition implements Position {
+    private final long position;
+
+    LongPosition(long position) {
+      this.position = position;
+    }
+  }
+
+  private class FakeStream implements Stream {
+    @Override
+    public Position append(byte[] contents) throws StreamAccessException {
+      long position = data.isEmpty() ? 1 : data.lastKey() + 1;
+      data.put(position, contents);
+      return new LongPosition(position);
+    }
+
+    @Override
+    public Iterator<Entry> readAll() throws StreamAccessException {
+      return Iterators.transform(data.values().iterator(), e -> new Entry() {
+        @Override
+        public byte[] contents() {
+          return e;
+        }
+      });
+    }
+
+    @Override
+    public void truncateBefore(Position position) {
+      if (!(position instanceof LongPosition)) {
+        throw new IllegalArgumentException("Wrong position type");
+      }
+
+      data.headMap(((LongPosition) position).position).clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/9b9b2ee7/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
 
b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
new file mode 100644
index 0000000..f43a836
--- /dev/null
+++ 
b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.log;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.TypeLiteral;
+
+import org.apache.aurora.common.application.ShutdownRegistry;
+import 
org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl;
+import org.apache.aurora.common.collections.Pair;
+import org.apache.aurora.common.inject.Bindings;
+import org.apache.aurora.common.quantity.Data;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.stats.StatsProvider;
+import org.apache.aurora.common.testing.TearDownTestCase;
+import org.apache.aurora.common.util.BuildInfo;
+import org.apache.aurora.common.util.Clock;
+import org.apache.aurora.common.util.testing.FakeBuildInfo;
+import org.apache.aurora.common.util.testing.FakeClock;
+import org.apache.aurora.gen.storage.Snapshot;
+import org.apache.aurora.scheduler.TierModule;
+import org.apache.aurora.scheduler.config.types.DataAmount;
+import org.apache.aurora.scheduler.config.types.TimeAmount;
+import org.apache.aurora.scheduler.events.EventSink;
+import org.apache.aurora.scheduler.log.Log;
+import org.apache.aurora.scheduler.resources.ResourceTestUtil;
+import org.apache.aurora.scheduler.storage.SnapshotStore;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet;
+import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.Volatile;
+import org.apache.aurora.scheduler.storage.entities.IResourceAggregate;
+import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options;
+import org.apache.aurora.scheduler.storage.mem.MemStorageModule;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class NonVolatileStorageTest extends TearDownTestCase {
+
+  private FakeLog log;
+  private Runnable teardown = () -> { };
+  private NonVolatileStorage storage;
+
+  @Before
+  public void setUp() {
+    log = new FakeLog();
+    resetStorage();
+    addTearDown(teardown::run);
+  }
+
+  private void resetStorage() {
+    teardown.run();
+
+    Options options = new Options();
+    options.maxLogEntrySize = new DataAmount(1, Data.GB);
+    options.snapshotInterval = new TimeAmount(1, Time.DAYS);
+
+    ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl();
+    Injector injector = Guice.createInjector(
+        new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)),
+        new LogStorageModule(options),
+        new TierModule(new TierModule.Options()),
+        new AbstractModule() {
+          @Override
+          protected void configure() {
+            bind(Clock.class).toInstance(new FakeClock());
+            
bind(BuildInfo.class).toInstance(FakeBuildInfo.generateBuildInfo());
+            bind(EventSink.class).toInstance(e -> { });
+            bind(ShutdownRegistry.class).toInstance(shutdownRegistry);
+            bind(StatsProvider.class).toInstance(new FakeStatsProvider());
+            bind(Log.class).toInstance(log);
+            bind(new TypeLiteral<SnapshotStore<Snapshot>>() { 
}).to(SnapshotStoreImpl.class);
+          }
+        }
+    );
+    storage = injector.getInstance(NonVolatileStorage.class);
+    storage.prepare();
+    storage.start(w -> { });
+
+    teardown = () -> {
+      storage.stop();
+      shutdownRegistry.execute();
+    };
+  }
+
+  @Test
+  public void testDurability() {
+    List<Pair<Quiet, Consumer<StoreProvider>>> transactions = 
Lists.newArrayList();
+
+    IResourceAggregate quota = ResourceTestUtil.aggregate(2.0, 2048, 1024);
+    transactions.add(Pair.of(
+        stores -> {
+          stores.getQuotaStore().saveQuota("lucy", quota);
+        },
+        stores -> {
+          assertEquals(Optional.of(quota), 
stores.getQuotaStore().fetchQuota("lucy"));
+        }
+    ));
+    IResourceAggregate quota2 = ResourceTestUtil.aggregate(2.0, 2048, 1024);
+    transactions.add(Pair.of(
+        stores -> {
+          stores.getQuotaStore().saveQuota("lucy", quota2);
+        },
+        stores -> {
+          assertEquals(Optional.of(quota2), 
stores.getQuotaStore().fetchQuota("lucy"));
+        }
+    ));
+    transactions.add(Pair.of(
+        stores -> {
+          stores.getQuotaStore().removeQuota("lucy");
+        },
+        stores -> {
+          assertEquals(Optional.absent(), 
stores.getQuotaStore().fetchQuota("lucy"));
+        }
+    ));
+
+    // Walk through each transaction, simulating a storage stop/reload.
+    transactions.stream()
+        .forEach(transaction -> {
+          storage.write(transaction.getFirst());
+
+          resetStorage();
+          storage.read(stores -> {
+            transaction.getSecond().accept(stores);
+            return null;
+          });
+
+          // Result should survive another reset.
+          storage.snapshot();
+          resetStorage();
+          storage.read(stores -> {
+            transaction.getSecond().accept(stores);
+            return null;
+          });
+        });
+  }
+}

Reply via email to