Repository: aurora Updated Branches: refs/heads/master 773d2d6ab -> 9b9b2ee7b
Add a test for storage durability Reviewed at https://reviews.apache.org/r/63670/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/9b9b2ee7 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/9b9b2ee7 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/9b9b2ee7 Branch: refs/heads/master Commit: 9b9b2ee7b52e5299494828a0fa7fc793c9364de7 Parents: 773d2d6 Author: Bill Farner <wfar...@apache.org> Authored: Thu Nov 9 09:26:50 2017 -0800 Committer: Bill Farner <wfar...@apache.org> Committed: Thu Nov 9 09:26:50 2017 -0800 ---------------------------------------------------------------------- .../org/apache/aurora/scheduler/log/Log.java | 2 +- .../aurora/scheduler/log/mesos/MesosLog.java | 6 - .../aurora/scheduler/app/SchedulerIT.java | 20 +-- .../scheduler/log/mesos/MesosLogTest.java | 14 -- .../aurora/scheduler/storage/log/FakeLog.java | 68 ++++++++ .../storage/log/NonVolatileStorageTest.java | 158 +++++++++++++++++++ 6 files changed, 228 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/9b9b2ee7/src/main/java/org/apache/aurora/scheduler/log/Log.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/log/Log.java b/src/main/java/org/apache/aurora/scheduler/log/Log.java index dc77eb4..2a76144 100644 --- a/src/main/java/org/apache/aurora/scheduler/log/Log.java +++ b/src/main/java/org/apache/aurora/scheduler/log/Log.java @@ -28,7 +28,7 @@ public interface Log { /** * An opaque ordered handle to a log entry's position in the log stream. */ - interface Position extends Comparable<Position> { + interface Position { } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/9b9b2ee7/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java index 21855e1..bcdd459 100644 --- a/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java +++ b/src/main/java/org/apache/aurora/scheduler/log/mesos/MesosLog.java @@ -376,12 +376,6 @@ public class MesosLog implements org.apache.aurora.scheduler.log.Log { Log.Position unwrap() { return underlying; } - - @Override - public int compareTo(Position o) { - Preconditions.checkArgument(o instanceof LogPosition); - return underlying.compareTo(((LogPosition) o).underlying); - } } private static class LogEntry implements org.apache.aurora.scheduler.log.Log.Entry { http://git-wip-us.apache.org/repos/asf/aurora/blob/9b9b2ee7/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index 67a0d5a..4929ecd 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -22,7 +22,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Optional; @@ -272,23 +271,6 @@ public class SchedulerIT extends BaseZooKeeperTest { }).get(); } - private final AtomicInteger curPosition = new AtomicInteger(); - private static class IntPosition implements Position { - private final int pos; - - IntPosition(int pos) { - this.pos = pos; - } - - @Override - public int compareTo(Position position) { - return pos - ((IntPosition) position).pos; - } - } - private IntPosition nextPosition() { - return new IntPosition(curPosition.incrementAndGet()); - } - private Iterable<Entry> toEntries(LogEntry... entries) { return Iterables.transform(Arrays.asList(entries), entry -> { @@ -337,7 +319,7 @@ public class SchedulerIT extends BaseZooKeeperTest { expect(log.open()).andReturn(logStream); expect(logStream.readAll()).andReturn(recoveredEntries.iterator()).anyTimes(); streamMatcher.expectTransaction(Op.saveFrameworkId(new SaveFrameworkId(FRAMEWORK_ID))) - .andReturn(nextPosition()); + .andReturn(new Position() { }); CountDownLatch driverStarted = new CountDownLatch(1); expect(driver.start()).andAnswer(() -> { http://git-wip-us.apache.org/repos/asf/aurora/blob/9b9b2ee7/src/test/java/org/apache/aurora/scheduler/log/mesos/MesosLogTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/log/mesos/MesosLogTest.java b/src/test/java/org/apache/aurora/scheduler/log/mesos/MesosLogTest.java index f142f54..3e6d0d3 100644 --- a/src/test/java/org/apache/aurora/scheduler/log/mesos/MesosLogTest.java +++ b/src/test/java/org/apache/aurora/scheduler/log/mesos/MesosLogTest.java @@ -21,7 +21,6 @@ import java.util.concurrent.TimeoutException; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSortedSet; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -274,17 +273,4 @@ public class MesosLogTest extends EasyMockTest { // So close! The implementation requires that hasNext() is called first. logStream.readAll().next(); } - - @Test - public void testSortOrder() throws Exception { - control.replay(); - - LogPosition a = new LogPosition(makePosition(5)); - LogPosition b = new LogPosition(makePosition(10)); - LogPosition c = new LogPosition(makePosition(3)); - assertEquals( - ImmutableList.of(c, a, b), - ImmutableList.copyOf(ImmutableSortedSet.of(a, b, c)) - ); - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/9b9b2ee7/src/test/java/org/apache/aurora/scheduler/storage/log/FakeLog.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/FakeLog.java b/src/test/java/org/apache/aurora/scheduler/storage/log/FakeLog.java new file mode 100644 index 0000000..b62d26e --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/FakeLog.java @@ -0,0 +1,68 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.log; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NavigableMap; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; + +import org.apache.aurora.scheduler.log.Log; + +public class FakeLog implements Log { + private final NavigableMap<Long, byte[]> data = Maps.newTreeMap(); + + @Override + public Stream open() throws IOException { + return new FakeStream(); + } + + private static class LongPosition implements Position { + private final long position; + + LongPosition(long position) { + this.position = position; + } + } + + private class FakeStream implements Stream { + @Override + public Position append(byte[] contents) throws StreamAccessException { + long position = data.isEmpty() ? 1 : data.lastKey() + 1; + data.put(position, contents); + return new LongPosition(position); + } + + @Override + public Iterator<Entry> readAll() throws StreamAccessException { + return Iterators.transform(data.values().iterator(), e -> new Entry() { + @Override + public byte[] contents() { + return e; + } + }); + } + + @Override + public void truncateBefore(Position position) { + if (!(position instanceof LongPosition)) { + throw new IllegalArgumentException("Wrong position type"); + } + + data.headMap(((LongPosition) position).position).clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/9b9b2ee7/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java new file mode 100644 index 0000000..f43a836 --- /dev/null +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/NonVolatileStorageTest.java @@ -0,0 +1,158 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.scheduler.storage.log; + +import java.util.List; +import java.util.function.Consumer; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.TypeLiteral; + +import org.apache.aurora.common.application.ShutdownRegistry; +import org.apache.aurora.common.application.ShutdownRegistry.ShutdownRegistryImpl; +import org.apache.aurora.common.collections.Pair; +import org.apache.aurora.common.inject.Bindings; +import org.apache.aurora.common.quantity.Data; +import org.apache.aurora.common.quantity.Time; +import org.apache.aurora.common.stats.StatsProvider; +import org.apache.aurora.common.testing.TearDownTestCase; +import org.apache.aurora.common.util.BuildInfo; +import org.apache.aurora.common.util.Clock; +import org.apache.aurora.common.util.testing.FakeBuildInfo; +import org.apache.aurora.common.util.testing.FakeClock; +import org.apache.aurora.gen.storage.Snapshot; +import org.apache.aurora.scheduler.TierModule; +import org.apache.aurora.scheduler.config.types.DataAmount; +import org.apache.aurora.scheduler.config.types.TimeAmount; +import org.apache.aurora.scheduler.events.EventSink; +import org.apache.aurora.scheduler.log.Log; +import org.apache.aurora.scheduler.resources.ResourceTestUtil; +import org.apache.aurora.scheduler.storage.SnapshotStore; +import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult.Quiet; +import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; +import org.apache.aurora.scheduler.storage.Storage.StoreProvider; +import org.apache.aurora.scheduler.storage.Storage.Volatile; +import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; +import org.apache.aurora.scheduler.storage.log.LogStorageModule.Options; +import org.apache.aurora.scheduler.storage.mem.MemStorageModule; +import org.apache.aurora.scheduler.testing.FakeStatsProvider; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class NonVolatileStorageTest extends TearDownTestCase { + + private FakeLog log; + private Runnable teardown = () -> { }; + private NonVolatileStorage storage; + + @Before + public void setUp() { + log = new FakeLog(); + resetStorage(); + addTearDown(teardown::run); + } + + private void resetStorage() { + teardown.run(); + + Options options = new Options(); + options.maxLogEntrySize = new DataAmount(1, Data.GB); + options.snapshotInterval = new TimeAmount(1, Time.DAYS); + + ShutdownRegistryImpl shutdownRegistry = new ShutdownRegistryImpl(); + Injector injector = Guice.createInjector( + new MemStorageModule(Bindings.annotatedKeyFactory(Volatile.class)), + new LogStorageModule(options), + new TierModule(new TierModule.Options()), + new AbstractModule() { + @Override + protected void configure() { + bind(Clock.class).toInstance(new FakeClock()); + bind(BuildInfo.class).toInstance(FakeBuildInfo.generateBuildInfo()); + bind(EventSink.class).toInstance(e -> { }); + bind(ShutdownRegistry.class).toInstance(shutdownRegistry); + bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + bind(Log.class).toInstance(log); + bind(new TypeLiteral<SnapshotStore<Snapshot>>() { }).to(SnapshotStoreImpl.class); + } + } + ); + storage = injector.getInstance(NonVolatileStorage.class); + storage.prepare(); + storage.start(w -> { }); + + teardown = () -> { + storage.stop(); + shutdownRegistry.execute(); + }; + } + + @Test + public void testDurability() { + List<Pair<Quiet, Consumer<StoreProvider>>> transactions = Lists.newArrayList(); + + IResourceAggregate quota = ResourceTestUtil.aggregate(2.0, 2048, 1024); + transactions.add(Pair.of( + stores -> { + stores.getQuotaStore().saveQuota("lucy", quota); + }, + stores -> { + assertEquals(Optional.of(quota), stores.getQuotaStore().fetchQuota("lucy")); + } + )); + IResourceAggregate quota2 = ResourceTestUtil.aggregate(2.0, 2048, 1024); + transactions.add(Pair.of( + stores -> { + stores.getQuotaStore().saveQuota("lucy", quota2); + }, + stores -> { + assertEquals(Optional.of(quota2), stores.getQuotaStore().fetchQuota("lucy")); + } + )); + transactions.add(Pair.of( + stores -> { + stores.getQuotaStore().removeQuota("lucy"); + }, + stores -> { + assertEquals(Optional.absent(), stores.getQuotaStore().fetchQuota("lucy")); + } + )); + + // Walk through each transaction, simulating a storage stop/reload. + transactions.stream() + .forEach(transaction -> { + storage.write(transaction.getFirst()); + + resetStorage(); + storage.read(stores -> { + transaction.getSecond().accept(stores); + return null; + }); + + // Result should survive another reset. + storage.snapshot(); + resetStorage(); + storage.read(stores -> { + transaction.getSecond().accept(stores); + return null; + }); + }); + } +}