http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java deleted file mode 100644 index 07b4bdb..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java +++ /dev/null @@ -1,576 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.log; - -import java.io.IOException; -import java.util.Date; -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; - -import javax.inject.Inject; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.MoreExecutors; - -import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; -import org.apache.aurora.common.application.ShutdownRegistry; -import org.apache.aurora.common.inject.TimedInterceptor.Timed; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.common.stats.SlidingStats; -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.storage.LogEntry; -import org.apache.aurora.gen.storage.Op; -import org.apache.aurora.gen.storage.SaveCronJob; -import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; -import org.apache.aurora.gen.storage.SaveJobUpdateEvent; -import org.apache.aurora.gen.storage.SaveQuota; -import org.apache.aurora.gen.storage.Snapshot; -import org.apache.aurora.scheduler.base.AsyncUtil; -import org.apache.aurora.scheduler.base.SchedulerException; -import org.apache.aurora.scheduler.events.EventSink; -import org.apache.aurora.scheduler.log.Log.Stream.InvalidPositionException; -import org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException; -import org.apache.aurora.scheduler.storage.AttributeStore; -import org.apache.aurora.scheduler.storage.CronJobStore; -import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; -import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.QuotaStore; -import org.apache.aurora.scheduler.storage.SchedulerStore; -import org.apache.aurora.scheduler.storage.SnapshotStore; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; -import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; -import org.apache.aurora.scheduler.storage.TaskStore; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.util.Objects.requireNonNull; - -/** - * A storage implementation that ensures committed transactions are written to a log. - * - * <p>In the classic write-ahead log usage we'd perform mutations as follows: - * <ol> - * <li>write op to log</li> - * <li>perform op locally</li> - * <li>*checkpoint</li> - * </ol> - * - * <p>Writing the operation to the log provides us with a fast persistence mechanism to ensure we - * have a record of our mutation in case we should need to recover state later after a crash or on - * a new host (assuming the log is distributed). We then apply the mutation to a local (in-memory) - * data structure for serving fast read requests and then optionally write down the position of the - * log entry we wrote in the first step to stable storage to allow for quicker recovery after a - * crash. Instead of reading the whole log, we can read all entries past the checkpoint. This - * design implies that all mutations must be idempotent and free from constraint and thus - * replayable over newer operations when recovering from an old checkpoint. - * - * <p>The important detail in our case is the possibility of writing an op to the log, and then - * failing to commit locally since we use a local database instead of an in-memory data structure. - * If we die after such a failure, then another instance can read and apply the logged op - * erroneously. - * - * <p>This implementation leverages a local transaction to handle this: - * <ol> - * <li>start local transaction</li> - * <li>perform op locally (uncommitted!)</li> - * <li>write op to log</li> - * <li>commit local transaction</li> - * <li>*checkpoint</li> - * </ol> - * - * <p>If the op fails to apply to local storage we will never write the op to the log and if the op - * fails to apply to the log, it'll throw and abort the local storage transaction as well. - */ -public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore { - - /** - * A service that can schedule an action to be executed periodically. - */ - @VisibleForTesting - interface SchedulingService { - - /** - * Schedules an action to execute periodically. - * - * @param interval The time period to wait until running the {@code action} again. - * @param action The action to execute periodically. - */ - void doEvery(Amount<Long, Time> interval, Runnable action); - } - - /** - * A maintainer for context about open transactions. Assumes that an external entity is - * responsible for opening and closing transactions. - */ - interface TransactionManager { - - /** - * Checks whether there is an open transaction. - * - * @return {@code true} if there is an open transaction, {@code false} otherwise. - */ - boolean hasActiveTransaction(); - - /** - * Adds an operation to the existing transaction. - * - * @param op Operation to include in the existing transaction. - */ - void log(Op op); - } - - private static class ScheduledExecutorSchedulingService implements SchedulingService { - private final ScheduledExecutorService scheduledExecutor; - - ScheduledExecutorSchedulingService(ShutdownRegistry shutdownRegistry, - Amount<Long, Time> shutdownGracePeriod) { - scheduledExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("LogStorage-%d", LOG); - shutdownRegistry.addAction(() -> MoreExecutors.shutdownAndAwaitTermination( - scheduledExecutor, - shutdownGracePeriod.getValue(), - shutdownGracePeriod.getUnit().getTimeUnit())); - } - - @Override - public void doEvery(Amount<Long, Time> interval, Runnable action) { - requireNonNull(interval); - requireNonNull(action); - - long delay = interval.getValue(); - TimeUnit timeUnit = interval.getUnit().getTimeUnit(); - scheduledExecutor.scheduleWithFixedDelay(action, delay, delay, timeUnit); - } - } - - private static final Logger LOG = LoggerFactory.getLogger(LogStorage.class); - - private final LogManager logManager; - private final SchedulingService schedulingService; - private final SnapshotStore<Snapshot> snapshotStore; - private final Amount<Long, Time> snapshotInterval; - private final Storage writeBehindStorage; - private final SchedulerStore.Mutable writeBehindSchedulerStore; - private final CronJobStore.Mutable writeBehindJobStore; - private final TaskStore.Mutable writeBehindTaskStore; - private final QuotaStore.Mutable writeBehindQuotaStore; - private final AttributeStore.Mutable writeBehindAttributeStore; - private final JobUpdateStore.Mutable writeBehindJobUpdateStore; - private final ReentrantLock writeLock; - private final ThriftBackfill thriftBackfill; - - private StreamManager streamManager; - private final WriteAheadStorage writeAheadStorage; - - // TODO(wfarner): It should be possible to remove this flag now, since all call stacks when - // recovering are controlled at this layer (they're all calls to Mutable store implementations). - // The more involved change is changing SnapshotStore to accept a Mutable store provider to - // avoid a call to Storage.write() when we replay a Snapshot. - private boolean recovered = false; - private StreamTransaction transaction = null; - - private final SlidingStats writerWaitStats = - new SlidingStats("log_storage_write_lock_wait", "ns"); - - private final Map<LogEntry._Fields, Consumer<LogEntry>> logEntryReplayActions; - private final Map<Op._Fields, Consumer<Op>> transactionReplayActions; - - @Inject - LogStorage( - LogManager logManager, - ShutdownRegistry shutdownRegistry, - Settings settings, - SnapshotStore<Snapshot> snapshotStore, - @Volatile Storage storage, - @Volatile SchedulerStore.Mutable schedulerStore, - @Volatile CronJobStore.Mutable jobStore, - @Volatile TaskStore.Mutable taskStore, - @Volatile QuotaStore.Mutable quotaStore, - @Volatile AttributeStore.Mutable attributeStore, - @Volatile JobUpdateStore.Mutable jobUpdateStore, - EventSink eventSink, - ReentrantLock writeLock, - ThriftBackfill thriftBackfill) { - - this(logManager, - new ScheduledExecutorSchedulingService(shutdownRegistry, settings.getShutdownGracePeriod()), - snapshotStore, - settings.getSnapshotInterval(), - storage, - schedulerStore, - jobStore, - taskStore, - quotaStore, - attributeStore, - jobUpdateStore, - eventSink, - writeLock, - thriftBackfill); - } - - @VisibleForTesting - LogStorage( - LogManager logManager, - SchedulingService schedulingService, - SnapshotStore<Snapshot> snapshotStore, - Amount<Long, Time> snapshotInterval, - Storage delegateStorage, - SchedulerStore.Mutable schedulerStore, - CronJobStore.Mutable jobStore, - TaskStore.Mutable taskStore, - QuotaStore.Mutable quotaStore, - AttributeStore.Mutable attributeStore, - JobUpdateStore.Mutable jobUpdateStore, - EventSink eventSink, - ReentrantLock writeLock, - ThriftBackfill thriftBackfill) { - - this.logManager = requireNonNull(logManager); - this.schedulingService = requireNonNull(schedulingService); - this.snapshotStore = requireNonNull(snapshotStore); - this.snapshotInterval = requireNonNull(snapshotInterval); - - // Log storage has two distinct operating modes: pre- and post-recovery. When recovering, - // we write directly to the writeBehind stores since we are replaying what's already persisted. - // After that, all writes must succeed in the distributed log before they may be considered - // successful. - this.writeBehindStorage = requireNonNull(delegateStorage); - this.writeBehindSchedulerStore = requireNonNull(schedulerStore); - this.writeBehindJobStore = requireNonNull(jobStore); - this.writeBehindTaskStore = requireNonNull(taskStore); - this.writeBehindQuotaStore = requireNonNull(quotaStore); - this.writeBehindAttributeStore = requireNonNull(attributeStore); - this.writeBehindJobUpdateStore = requireNonNull(jobUpdateStore); - this.writeLock = requireNonNull(writeLock); - this.thriftBackfill = requireNonNull(thriftBackfill); - TransactionManager transactionManager = new TransactionManager() { - @Override - public boolean hasActiveTransaction() { - return transaction != null; - } - - @Override - public void log(Op op) { - transaction.add(op); - } - }; - this.writeAheadStorage = new WriteAheadStorage( - transactionManager, - schedulerStore, - jobStore, - taskStore, - quotaStore, - attributeStore, - jobUpdateStore, - LoggerFactory.getLogger(WriteAheadStorage.class), - eventSink); - - this.logEntryReplayActions = buildLogEntryReplayActions(); - this.transactionReplayActions = buildTransactionReplayActions(); - } - - @VisibleForTesting - final Map<LogEntry._Fields, Consumer<LogEntry>> buildLogEntryReplayActions() { - return ImmutableMap.<LogEntry._Fields, Consumer<LogEntry>>builder() - .put(LogEntry._Fields.SNAPSHOT, logEntry -> { - Snapshot snapshot = logEntry.getSnapshot(); - LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp())); - snapshotStore.applySnapshot(snapshot); - }) - .put(LogEntry._Fields.TRANSACTION, logEntry -> write((NoResult.Quiet) unused -> { - for (Op op : logEntry.getTransaction().getOps()) { - replayOp(op); - } - })) - .put(LogEntry._Fields.NOOP, item -> { - // Nothing to do here - }) - .build(); - } - - @VisibleForTesting - final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() { - return ImmutableMap.<Op._Fields, Consumer<Op>>builder() - .put( - Op._Fields.SAVE_FRAMEWORK_ID, - op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId())) - .put(Op._Fields.SAVE_CRON_JOB, op -> { - SaveCronJob cronJob = op.getSaveCronJob(); - writeBehindJobStore.saveAcceptedJob( - thriftBackfill.backfillJobConfiguration(cronJob.getJobConfig())); - }) - .put( - Op._Fields.REMOVE_JOB, - op -> writeBehindJobStore.removeJob(IJobKey.build(op.getRemoveJob().getJobKey()))) - .put( - Op._Fields.SAVE_TASKS, - op -> writeBehindTaskStore.saveTasks( - thriftBackfill.backfillTasks(op.getSaveTasks().getTasks()))) - .put( - Op._Fields.REMOVE_TASKS, - op -> writeBehindTaskStore.deleteTasks(op.getRemoveTasks().getTaskIds())) - .put(Op._Fields.SAVE_QUOTA, op -> { - SaveQuota saveQuota = op.getSaveQuota(); - writeBehindQuotaStore.saveQuota( - saveQuota.getRole(), - ThriftBackfill.backfillResourceAggregate(saveQuota.getQuota())); - }) - .put( - Op._Fields.REMOVE_QUOTA, - op -> writeBehindQuotaStore.removeQuota(op.getRemoveQuota().getRole())) - .put(Op._Fields.SAVE_HOST_ATTRIBUTES, op -> { - HostAttributes attributes = op.getSaveHostAttributes().getHostAttributes(); - // Prior to commit 5cf760b, the store would persist maintenance mode changes for - // unknown hosts. 5cf760b began rejecting these, but the replicated log may still - // contain entries with a null slave ID. - if (attributes.isSetSlaveId()) { - writeBehindAttributeStore.saveHostAttributes(IHostAttributes.build(attributes)); - } else { - LOG.info("Dropping host attributes with no agent ID: " + attributes); - } - }) - .put(Op._Fields.SAVE_JOB_UPDATE, op -> - writeBehindJobUpdateStore.saveJobUpdate( - thriftBackfill.backFillJobUpdate(op.getSaveJobUpdate().getJobUpdate()))) - .put(Op._Fields.SAVE_JOB_UPDATE_EVENT, op -> { - SaveJobUpdateEvent event = op.getSaveJobUpdateEvent(); - writeBehindJobUpdateStore.saveJobUpdateEvent( - IJobUpdateKey.build(event.getKey()), - IJobUpdateEvent.build(op.getSaveJobUpdateEvent().getEvent())); - }) - .put(Op._Fields.SAVE_JOB_INSTANCE_UPDATE_EVENT, op -> { - SaveJobInstanceUpdateEvent event = op.getSaveJobInstanceUpdateEvent(); - writeBehindJobUpdateStore.saveJobInstanceUpdateEvent( - IJobUpdateKey.build(event.getKey()), - IJobInstanceUpdateEvent.build(op.getSaveJobInstanceUpdateEvent().getEvent())); - }) - .put(Op._Fields.PRUNE_JOB_UPDATE_HISTORY, op -> { - LOG.info("Dropping prune operation. Updates will be pruned later."); - }) - .put(Op._Fields.REMOVE_JOB_UPDATE, op -> - writeBehindJobUpdateStore.removeJobUpdates( - IJobUpdateKey.setFromBuilders(op.getRemoveJobUpdate().getKeys()))) - .build(); - } - - @Override - @Timed("scheduler_storage_prepare") - public synchronized void prepare() { - writeBehindStorage.prepare(); - // Open the log to make a log replica available to the scheduler group. - try { - streamManager = logManager.open(); - } catch (IOException e) { - throw new IllegalStateException("Failed to open the log, cannot continue", e); - } - } - - @Override - @Timed("scheduler_storage_start") - public synchronized void start(final MutateWork.NoResult.Quiet initializationLogic) { - write((NoResult.Quiet) unused -> { - // Must have the underlying storage started so we can query it for the last checkpoint. - // We replay these entries in the forwarded storage system's transactions but not ours - we - // do not want to re-record these ops to the log. - recover(); - recovered = true; - - // Now that we're recovered we should let any mutations done in initializationLogic append - // to the log, so run it in one of our transactions. - write(initializationLogic); - }); - - scheduleSnapshots(); - } - - @Override - public void stop() { - // No-op. - } - - @Timed("scheduler_log_recover") - void recover() throws RecoveryFailedException { - try { - streamManager.readFromBeginning(LogStorage.this::replay); - } catch (CodingException | InvalidPositionException | StreamAccessException e) { - throw new RecoveryFailedException(e); - } - } - - private static final class RecoveryFailedException extends SchedulerException { - RecoveryFailedException(Throwable cause) { - super(cause); - } - } - - private void replay(final LogEntry logEntry) { - LogEntry._Fields entryField = logEntry.getSetField(); - if (!logEntryReplayActions.containsKey(entryField)) { - throw new IllegalStateException("Unknown log entry type: " + entryField); - } - - logEntryReplayActions.get(entryField).accept(logEntry); - } - - private void replayOp(Op op) { - Op._Fields opField = op.getSetField(); - if (!transactionReplayActions.containsKey(opField)) { - throw new IllegalStateException("Unknown transaction op: " + opField); - } - - transactionReplayActions.get(opField).accept(op); - } - - private void scheduleSnapshots() { - if (snapshotInterval.getValue() > 0) { - schedulingService.doEvery(snapshotInterval, () -> { - try { - snapshot(); - } catch (StorageException e) { - if (e.getCause() == null) { - LOG.warn("StorageException when attempting to snapshot.", e); - } else { - LOG.warn(e.getMessage(), e.getCause()); - } - } - }); - } - } - - /** - * Forces a snapshot of the storage state. - * - * @throws CodingException If there is a problem encoding the snapshot. - * @throws InvalidPositionException If the log stream cursor is invalid. - * @throws StreamAccessException If there is a problem writing the snapshot to the log stream. - */ - @Timed("scheduler_log_snapshot") - void doSnapshot() throws CodingException, InvalidPositionException, StreamAccessException { - write((NoResult<CodingException>) (MutableStoreProvider unused) -> { - LOG.info("Creating snapshot."); - Snapshot snapshot = snapshotStore.createSnapshot(); - persist(snapshot); - LOG.info("Snapshot complete." - + " host attrs: " + snapshot.getHostAttributesSize() - + ", cron jobs: " + snapshot.getCronJobsSize() - + ", quota confs: " + snapshot.getQuotaConfigurationsSize() - + ", tasks: " + snapshot.getTasksSize() - + ", updates: " + snapshot.getJobUpdateDetailsSize()); - }); - } - - @Timed("scheduler_log_snapshot_persist") - @Override - public void persist(Snapshot snapshot) - throws CodingException, InvalidPositionException, StreamAccessException { - - streamManager.snapshot(snapshot); - } - - private <T, E extends Exception> T doInTransaction(final MutateWork<T, E> work) - throws StorageException, E { - - // The log stream transaction has already been set up so we just need to delegate with our - // store provider so any mutations performed by work get logged. - if (transaction != null) { - return work.apply(writeAheadStorage); - } - - transaction = streamManager.startTransaction(); - try { - return writeBehindStorage.write(unused -> { - T result = work.apply(writeAheadStorage); - try { - transaction.commit(); - } catch (CodingException e) { - throw new IllegalStateException( - "Problem encoding transaction operations to the log stream", e); - } catch (StreamAccessException e) { - throw new StorageException( - "There was a problem committing the transaction to the log.", e); - } - return result; - }); - } finally { - transaction = null; - } - } - - @Override - public <T, E extends Exception> T write(final MutateWork<T, E> work) throws StorageException, E { - long waitStart = System.nanoTime(); - writeLock.lock(); - try { - writerWaitStats.accumulate(System.nanoTime() - waitStart); - // We don't want to use the log when recovering from it, we just want to update the underlying - // store - so pass mutations straight through to the underlying storage. - if (!recovered) { - return writeBehindStorage.write(work); - } - - return doInTransaction(work); - } finally { - writeLock.unlock(); - } - } - - @Override - public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E { - return writeBehindStorage.read(work); - } - - @Override - public void snapshot() throws StorageException { - try { - doSnapshot(); - } catch (CodingException e) { - throw new StorageException("Failed to encode a snapshot", e); - } catch (InvalidPositionException e) { - throw new StorageException("Saved snapshot but failed to truncate entries preceding it", e); - } catch (StreamAccessException e) { - throw new StorageException("Failed to create a snapshot", e); - } - } - - /** - * Configuration settings for log storage. - */ - public static class Settings { - private final Amount<Long, Time> shutdownGracePeriod; - private final Amount<Long, Time> snapshotInterval; - - public Settings(Amount<Long, Time> shutdownGracePeriod, Amount<Long, Time> snapshotInterval) { - this.shutdownGracePeriod = requireNonNull(shutdownGracePeriod); - this.snapshotInterval = requireNonNull(snapshotInterval); - } - - public Amount<Long, Time> getShutdownGracePeriod() { - return shutdownGracePeriod; - } - - public Amount<Long, Time> getSnapshotInterval() { - return snapshotInterval; - } - } -}
http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java index c8dc7ad..75ec42a 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorageModule.java @@ -32,8 +32,10 @@ import org.apache.aurora.scheduler.storage.CallOrderEnforcingStorage; import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; +import org.apache.aurora.scheduler.storage.durability.DurableStorage; +import org.apache.aurora.scheduler.storage.durability.Persistence; import org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize; -import org.apache.aurora.scheduler.storage.log.LogStorage.Settings; +import org.apache.aurora.scheduler.storage.log.LogPersistence.Settings; import static org.apache.aurora.scheduler.storage.log.EntrySerializer.EntrySerializerImpl; import static org.apache.aurora.scheduler.storage.log.LogManager.LogEntryHashFunction; @@ -77,10 +79,13 @@ public class LogStorageModule extends PrivateModule { bind(new TypeLiteral<Amount<Integer, Data>>() { }).annotatedWith(MaxEntrySize.class) .toInstance(options.maxLogEntrySize); bind(LogManager.class).in(Singleton.class); - bind(LogStorage.class).in(Singleton.class); + bind(DurableStorage.class).in(Singleton.class); - install(CallOrderEnforcingStorage.wrappingModule(LogStorage.class)); - bind(DistributedSnapshotStore.class).to(LogStorage.class); + install(CallOrderEnforcingStorage.wrappingModule(DurableStorage.class)); + bind(LogPersistence.class).in(Singleton.class); + bind(Persistence.class).to(LogPersistence.class); + bind(DistributedSnapshotStore.class).to(LogPersistence.class); + expose(Persistence.class); expose(Storage.class); expose(NonVolatileStorage.class); expose(DistributedSnapshotStore.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java index 5859f80..739fad7 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/SnapshotStoreImpl.java @@ -48,6 +48,7 @@ import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.Volatile; +import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; import org.apache.aurora.scheduler.storage.entities.IHostAttributes; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java index ea147c0..18da32d 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java @@ -13,7 +13,7 @@ */ package org.apache.aurora.scheduler.storage.log; -import java.util.function.Consumer; +import java.util.Iterator; import org.apache.aurora.gen.storage.LogEntry; import org.apache.aurora.gen.storage.Snapshot; @@ -25,23 +25,21 @@ import static org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException; /** * Manages interaction with the log stream. Log entries can be - * {@link #readFromBeginning(Consumer) read from} the beginning, + * {@link #readFromBeginning() read from} the beginning, * a {@link #startTransaction() transaction} consisting of one or more local storage * operations can be committed atomically, or the log can be compacted by * {@link #snapshot(org.apache.aurora.gen.storage.Snapshot) snapshotting}. */ public interface StreamManager { /** - * Reads all entries in the log stream after the given position. If the position - * supplied is {@code null} then all log entries in the stream will be read. + * Reads all entries in the log stream. * - * @param reader A reader that will be handed log entries decoded from the stream. + * @return All stored log entries. * @throws CodingException if there was a problem decoding a log entry from the stream. * @throws InvalidPositionException if the given position is not found in the log. * @throws StreamAccessException if there is a problem reading from the log. */ - void readFromBeginning(Consumer<LogEntry> reader) - throws CodingException, InvalidPositionException, StreamAccessException; + Iterator<LogEntry> readFromBeginning() throws CodingException, StreamAccessException; /** * Truncates all entries in the log stream occuring before the given position. The entry at the @@ -54,8 +52,7 @@ public interface StreamManager { void truncateBefore(Log.Position position); /** - * Starts a transaction that can be used to commit a series of {@link Op}s to the log stream - * atomically. + * Starts a transaction that can be used to commit a series of ops to the log stream atomically. * * @return StreamTransaction A transaction manager to handle batching up commits to the * underlying stream. http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java index baf2647..c5b107f 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java @@ -19,12 +19,12 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; import javax.annotation.Nullable; import javax.inject.Inject; import com.google.common.base.Preconditions; +import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; @@ -95,31 +95,37 @@ class StreamManagerImpl implements StreamManager { } @Override - public void readFromBeginning(Consumer<LogEntry> reader) + public Iterator<LogEntry> readFromBeginning() throws CodingException, InvalidPositionException, StreamAccessException { Iterator<Log.Entry> entries = stream.readAll(); - while (entries.hasNext()) { - LogEntry logEntry = decodeLogEntry(entries.next()); - while (logEntry != null && isFrame(logEntry)) { - logEntry = tryDecodeFrame(logEntry.getFrame(), entries); - } - if (logEntry != null) { - if (logEntry.isSet(LogEntry._Fields.DEFLATED_ENTRY)) { - logEntry = Entries.inflate(logEntry); - vars.deflatedEntriesRead.incrementAndGet(); - } - - if (logEntry.isSetDeduplicatedSnapshot()) { - logEntry = LogEntry.snapshot( - snapshotDeduplicator.reduplicate(logEntry.getDeduplicatedSnapshot())); + return new AbstractIterator<LogEntry>() { + @Override + protected LogEntry computeNext() { + while (entries.hasNext()) { + LogEntry logEntry = decodeLogEntry(entries.next()); + while (logEntry != null && isFrame(logEntry)) { + logEntry = tryDecodeFrame(logEntry.getFrame(), entries); + } + if (logEntry != null) { + if (logEntry.isSet(LogEntry._Fields.DEFLATED_ENTRY)) { + logEntry = Entries.inflate(logEntry); + vars.deflatedEntriesRead.incrementAndGet(); + } + + if (logEntry.isSetDeduplicatedSnapshot()) { + logEntry = LogEntry.snapshot( + snapshotDeduplicator.reduplicate(logEntry.getDeduplicatedSnapshot())); + } + + vars.entriesRead.incrementAndGet(); + return logEntry; + } } - - reader.accept(logEntry); - vars.entriesRead.incrementAndGet(); + return endOfData(); } - } + }; } @Nullable http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java b/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java deleted file mode 100644 index 92b64bb..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/ThriftBackfill.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.log; - -import java.util.EnumSet; -import java.util.Set; -import java.util.stream.Collectors; - -import com.google.inject.Inject; - -import org.apache.aurora.GuavaUtils; -import org.apache.aurora.gen.JobConfiguration; -import org.apache.aurora.gen.JobUpdate; -import org.apache.aurora.gen.JobUpdateInstructions; -import org.apache.aurora.gen.Resource; -import org.apache.aurora.gen.ResourceAggregate; -import org.apache.aurora.gen.ScheduledTask; -import org.apache.aurora.gen.TaskConfig; -import org.apache.aurora.scheduler.TierInfo; -import org.apache.aurora.scheduler.TierManager; -import org.apache.aurora.scheduler.quota.QuotaManager; -import org.apache.aurora.scheduler.resources.ResourceType; -import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; -import org.apache.aurora.scheduler.storage.entities.IJobUpdate; -import org.apache.aurora.scheduler.storage.entities.IResource; -import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.apache.aurora.scheduler.storage.entities.ITaskConfig; - -import static java.lang.String.format; -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.scheduler.resources.ResourceType.CPUS; -import static org.apache.aurora.scheduler.resources.ResourceType.DISK_MB; -import static org.apache.aurora.scheduler.resources.ResourceType.RAM_MB; - -/** - * Helps migrating thrift schema by populating deprecated and/or replacement fields. - */ -public final class ThriftBackfill { - - private final TierManager tierManager; - - @Inject - public ThriftBackfill(TierManager tierManager) { - this.tierManager = requireNonNull(tierManager); - } - - private static Resource getResource(Set<Resource> resources, ResourceType type) { - return resources.stream() - .filter(e -> ResourceType.fromResource(IResource.build(e)).equals(type)) - .findFirst() - .orElseThrow(() -> - new IllegalArgumentException("Missing resource definition for " + type)); - } - - /** - * Ensures TaskConfig.resources and correspondent task-level fields are all populated. - * - * @param config TaskConfig to backfill. - * @return Backfilled TaskConfig. - */ - public TaskConfig backfillTask(TaskConfig config) { - backfillTier(config); - return config; - } - - private void backfillTier(TaskConfig config) { - ITaskConfig taskConfig = ITaskConfig.build(config); - if (config.isSetTier()) { - TierInfo tier = tierManager.getTier(taskConfig); - config.setProduction(!tier.isPreemptible() && !tier.isRevocable()); - } else { - config.setTier(tierManager.getTiers() - .entrySet() - .stream() - .filter(e -> e.getValue().isPreemptible() == !taskConfig.isProduction() - && !e.getValue().isRevocable()) - .findFirst() - .orElseThrow(() -> new IllegalStateException( - format("No matching implicit tier for task of job %s", taskConfig.getJob()))) - .getKey()); - } - } - - /** - * Backfills JobConfiguration. See {@link #backfillTask(TaskConfig)}. - * - * @param jobConfig JobConfiguration to backfill. - * @return Backfilled JobConfiguration. - */ - public IJobConfiguration backfillJobConfiguration(JobConfiguration jobConfig) { - backfillTask(jobConfig.getTaskConfig()); - return IJobConfiguration.build(jobConfig); - } - - /** - * Backfills set of tasks. See {@link #backfillTask(TaskConfig)}. - * - * @param tasks Set of tasks to backfill. - * @return Backfilled set of tasks. - */ - public Set<IScheduledTask> backfillTasks(Set<ScheduledTask> tasks) { - return tasks.stream() - .map(t -> backfillScheduledTask(t)) - .map(IScheduledTask::build) - .collect(GuavaUtils.toImmutableSet()); - } - - /** - * Ensures ResourceAggregate.resources and correspondent deprecated fields are all populated. - * - * @param aggregate ResourceAggregate to backfill. - * @return Backfilled IResourceAggregate. - */ - public static IResourceAggregate backfillResourceAggregate(ResourceAggregate aggregate) { - if (!aggregate.isSetResources() || aggregate.getResources().isEmpty()) { - aggregate.addToResources(Resource.numCpus(aggregate.getNumCpus())); - aggregate.addToResources(Resource.ramMb(aggregate.getRamMb())); - aggregate.addToResources(Resource.diskMb(aggregate.getDiskMb())); - } else { - EnumSet<ResourceType> quotaResources = QuotaManager.QUOTA_RESOURCE_TYPES; - if (aggregate.getResources().size() > quotaResources.size()) { - throw new IllegalArgumentException("Too many resource values in quota."); - } - - if (!quotaResources.equals(aggregate.getResources().stream() - .map(e -> ResourceType.fromResource(IResource.build(e))) - .collect(Collectors.toSet()))) { - - throw new IllegalArgumentException("Quota resources must be exactly: " + quotaResources); - } - aggregate.setNumCpus( - getResource(aggregate.getResources(), CPUS).getNumCpus()); - aggregate.setRamMb( - getResource(aggregate.getResources(), RAM_MB).getRamMb()); - aggregate.setDiskMb( - getResource(aggregate.getResources(), DISK_MB).getDiskMb()); - } - return IResourceAggregate.build(aggregate); - } - - private ScheduledTask backfillScheduledTask(ScheduledTask task) { - backfillTask(task.getAssignedTask().getTask()); - return task; - } - - /** - * Backfills JobUpdate. See {@link #backfillTask(TaskConfig)}. - * - * @param update JobUpdate to backfill. - * @return Backfilled job update. - */ - IJobUpdate backFillJobUpdate(JobUpdate update) { - JobUpdateInstructions instructions = update.getInstructions(); - if (instructions.isSetDesiredState()) { - backfillTask(instructions.getDesiredState().getTask()); - } - - instructions.getInitialState().forEach(e -> backfillTask(e.getTask())); - - return IJobUpdate.build(update); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java deleted file mode 100644 index 41061f8..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/WriteAheadStorage.java +++ /dev/null @@ -1,369 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.log; - -import java.util.List; -import java.util.Map; -import java.util.Set; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.gen.storage.Op; -import org.apache.aurora.gen.storage.RemoveJob; -import org.apache.aurora.gen.storage.RemoveQuota; -import org.apache.aurora.gen.storage.RemoveTasks; -import org.apache.aurora.gen.storage.SaveCronJob; -import org.apache.aurora.gen.storage.SaveFrameworkId; -import org.apache.aurora.gen.storage.SaveHostAttributes; -import org.apache.aurora.gen.storage.SaveJobInstanceUpdateEvent; -import org.apache.aurora.gen.storage.SaveJobUpdate; -import org.apache.aurora.gen.storage.SaveJobUpdateEvent; -import org.apache.aurora.gen.storage.SaveQuota; -import org.apache.aurora.gen.storage.SaveTasks; -import org.apache.aurora.scheduler.base.Query; -import org.apache.aurora.scheduler.events.EventSink; -import org.apache.aurora.scheduler.events.PubsubEvent; -import org.apache.aurora.scheduler.storage.AttributeStore; -import org.apache.aurora.scheduler.storage.CronJobStore; -import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.QuotaStore; -import org.apache.aurora.scheduler.storage.SchedulerStore; -import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider; -import org.apache.aurora.scheduler.storage.TaskStore; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; -import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.aurora.scheduler.storage.entities.IJobUpdate; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; -import org.apache.aurora.scheduler.storage.entities.IResourceAggregate; -import org.apache.aurora.scheduler.storage.entities.IScheduledTask; -import org.slf4j.Logger; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.scheduler.storage.log.LogStorage.TransactionManager; - -/** - * Mutable stores implementation that translates all operations to {@link Op}s (which are passed - * to a provided {@link TransactionManager}) before forwarding the operations to delegate mutable - * stores. - */ -class WriteAheadStorage implements - MutableStoreProvider, - SchedulerStore.Mutable, - CronJobStore.Mutable, - TaskStore.Mutable, - QuotaStore.Mutable, - AttributeStore.Mutable, - JobUpdateStore.Mutable { - - private final TransactionManager transactionManager; - private final SchedulerStore.Mutable schedulerStore; - private final CronJobStore.Mutable jobStore; - private final TaskStore.Mutable taskStore; - private final QuotaStore.Mutable quotaStore; - private final AttributeStore.Mutable attributeStore; - private final JobUpdateStore.Mutable jobUpdateStore; - private final Logger log; - private final EventSink eventSink; - - /** - * Creates a new write-ahead storage that delegates to the providing default stores. - * - * @param transactionManager External controller for transaction operations. - * @param schedulerStore Delegate. - * @param jobStore Delegate. - * @param taskStore Delegate. - * @param quotaStore Delegate. - * @param attributeStore Delegate. - * @param jobUpdateStore Delegate. - */ - WriteAheadStorage( - TransactionManager transactionManager, - SchedulerStore.Mutable schedulerStore, - CronJobStore.Mutable jobStore, - TaskStore.Mutable taskStore, - QuotaStore.Mutable quotaStore, - AttributeStore.Mutable attributeStore, - JobUpdateStore.Mutable jobUpdateStore, - Logger log, - EventSink eventSink) { - - this.transactionManager = requireNonNull(transactionManager); - this.schedulerStore = requireNonNull(schedulerStore); - this.jobStore = requireNonNull(jobStore); - this.taskStore = requireNonNull(taskStore); - this.quotaStore = requireNonNull(quotaStore); - this.attributeStore = requireNonNull(attributeStore); - this.jobUpdateStore = requireNonNull(jobUpdateStore); - this.log = requireNonNull(log); - this.eventSink = requireNonNull(eventSink); - } - - private void write(Op op) { - Preconditions.checkState( - transactionManager.hasActiveTransaction(), - "Mutating operations must be within a transaction."); - transactionManager.log(op); - } - - @Override - public void saveFrameworkId(final String frameworkId) { - requireNonNull(frameworkId); - - write(Op.saveFrameworkId(new SaveFrameworkId(frameworkId))); - schedulerStore.saveFrameworkId(frameworkId); - } - - @Override - public void deleteTasks(final Set<String> taskIds) { - requireNonNull(taskIds); - - write(Op.removeTasks(new RemoveTasks(taskIds))); - taskStore.deleteTasks(taskIds); - } - - @Override - public void saveTasks(final Set<IScheduledTask> newTasks) { - requireNonNull(newTasks); - - write(Op.saveTasks(new SaveTasks(IScheduledTask.toBuildersSet(newTasks)))); - taskStore.saveTasks(newTasks); - } - - @Override - public Optional<IScheduledTask> mutateTask( - String taskId, - Function<IScheduledTask, IScheduledTask> mutator) { - - Optional<IScheduledTask> mutated = taskStore.mutateTask(taskId, mutator); - log.debug("Storing updated task to log: {}={}", taskId, mutated.get().getStatus()); - write(Op.saveTasks(new SaveTasks(ImmutableSet.of(mutated.get().newBuilder())))); - - return mutated; - } - - @Override - public void saveQuota(final String role, final IResourceAggregate quota) { - requireNonNull(role); - requireNonNull(quota); - - write(Op.saveQuota(new SaveQuota(role, quota.newBuilder()))); - quotaStore.saveQuota(role, quota); - } - - @Override - public boolean saveHostAttributes(final IHostAttributes attrs) { - requireNonNull(attrs); - - boolean changed = attributeStore.saveHostAttributes(attrs); - if (changed) { - write(Op.saveHostAttributes(new SaveHostAttributes(attrs.newBuilder()))); - eventSink.post(new PubsubEvent.HostAttributesChanged(attrs)); - } - return changed; - } - - @Override - public void removeJob(final IJobKey jobKey) { - requireNonNull(jobKey); - - write(Op.removeJob(new RemoveJob().setJobKey(jobKey.newBuilder()))); - jobStore.removeJob(jobKey); - } - - @Override - public void saveAcceptedJob(final IJobConfiguration jobConfig) { - requireNonNull(jobConfig); - - write(Op.saveCronJob(new SaveCronJob(jobConfig.newBuilder()))); - jobStore.saveAcceptedJob(jobConfig); - } - - @Override - public void removeQuota(final String role) { - requireNonNull(role); - - write(Op.removeQuota(new RemoveQuota(role))); - quotaStore.removeQuota(role); - } - - @Override - public void saveJobUpdate(IJobUpdate update) { - requireNonNull(update); - - write(Op.saveJobUpdate(new SaveJobUpdate().setJobUpdate(update.newBuilder()))); - jobUpdateStore.saveJobUpdate(update); - } - - @Override - public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) { - requireNonNull(key); - requireNonNull(event); - - write(Op.saveJobUpdateEvent(new SaveJobUpdateEvent(event.newBuilder(), key.newBuilder()))); - jobUpdateStore.saveJobUpdateEvent(key, event); - } - - @Override - public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) { - requireNonNull(key); - requireNonNull(event); - - write(Op.saveJobInstanceUpdateEvent( - new SaveJobInstanceUpdateEvent(event.newBuilder(), key.newBuilder()))); - jobUpdateStore.saveJobInstanceUpdateEvent(key, event); - } - - @Override - public void removeJobUpdates(Set<IJobUpdateKey> keys) { - requireNonNull(keys); - - // Compatibility mode - RemoveJobUpdates is not yet written since older versions cannot - // read it. JobUpdates are only removed implicitly when a snapshot is taken. - jobUpdateStore.removeJobUpdates(keys); - } - - @Override - public void deleteAllTasks() { - throw new UnsupportedOperationException( - "Unsupported since casual storage users should never be doing this."); - } - - @Override - public void deleteHostAttributes() { - throw new UnsupportedOperationException( - "Unsupported since casual storage users should never be doing this."); - } - - @Override - public void deleteJobs() { - throw new UnsupportedOperationException( - "Unsupported since casual storage users should never be doing this."); - } - - @Override - public void deleteQuotas() { - throw new UnsupportedOperationException( - "Unsupported since casual storage users should never be doing this."); - } - - @Override - public void deleteAllUpdates() { - throw new UnsupportedOperationException( - "Unsupported since casual storage users should never be doing this."); - } - - @Override - public SchedulerStore.Mutable getSchedulerStore() { - return this; - } - - @Override - public CronJobStore.Mutable getCronJobStore() { - return this; - } - - @Override - public TaskStore.Mutable getUnsafeTaskStore() { - return this; - } - - @Override - public QuotaStore.Mutable getQuotaStore() { - return this; - } - - @Override - public AttributeStore.Mutable getAttributeStore() { - return this; - } - - @Override - public TaskStore getTaskStore() { - return this; - } - - @Override - public JobUpdateStore.Mutable getJobUpdateStore() { - return this; - } - - @Override - public Optional<String> fetchFrameworkId() { - return this.schedulerStore.fetchFrameworkId(); - } - - @Override - public Iterable<IJobConfiguration> fetchJobs() { - return this.jobStore.fetchJobs(); - } - - @Override - public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) { - return this.jobStore.fetchJob(jobKey); - } - - @Override - public Optional<IScheduledTask> fetchTask(String taskId) { - return this.taskStore.fetchTask(taskId); - } - - @Override - public Iterable<IScheduledTask> fetchTasks(Query.Builder query) { - return this.taskStore.fetchTasks(query); - } - - @Override - public Set<IJobKey> getJobKeys() { - return this.taskStore.getJobKeys(); - } - - @Override - public Optional<IResourceAggregate> fetchQuota(String role) { - return this.quotaStore.fetchQuota(role); - } - - @Override - public Map<String, IResourceAggregate> fetchQuotas() { - return this.quotaStore.fetchQuotas(); - } - - @Override - public Optional<IHostAttributes> getHostAttributes(String host) { - return this.attributeStore.getHostAttributes(host); - } - - @Override - public Set<IHostAttributes> getHostAttributes() { - return this.attributeStore.getHostAttributes(); - } - - @Override - public List<IJobUpdateDetails> fetchJobUpdates(IJobUpdateQuery query) { - return this.jobUpdateStore.fetchJobUpdates(query); - } - - @Override - public Optional<IJobUpdateDetails> fetchJobUpdate(IJobUpdateKey key) { - return this.jobUpdateStore.fetchJobUpdate(key); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java index 2cc567d..a519b07 100644 --- a/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java +++ b/src/main/java/org/apache/aurora/scheduler/thrift/SchedulerThriftInterface.java @@ -83,11 +83,13 @@ import org.apache.aurora.scheduler.state.MaintenanceController; import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.state.UUIDGenerator; +import org.apache.aurora.scheduler.storage.DistributedSnapshotStore; import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult; import org.apache.aurora.scheduler.storage.Storage.NonVolatileStorage; import org.apache.aurora.scheduler.storage.Storage.StoreProvider; import org.apache.aurora.scheduler.storage.backup.Recovery; import org.apache.aurora.scheduler.storage.backup.StorageBackup; +import org.apache.aurora.scheduler.storage.durability.ThriftBackfill; import org.apache.aurora.scheduler.storage.entities.IHostStatus; import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; import org.apache.aurora.scheduler.storage.entities.IJobKey; @@ -99,7 +101,6 @@ import org.apache.aurora.scheduler.storage.entities.IMetadata; import org.apache.aurora.scheduler.storage.entities.IRange; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.entities.ITaskConfig; -import org.apache.aurora.scheduler.storage.log.ThriftBackfill; import org.apache.aurora.scheduler.thrift.aop.AnnotatedAuroraAdmin; import org.apache.aurora.scheduler.thrift.aop.ThriftWorkload; import org.apache.aurora.scheduler.thrift.auth.DecoratedThrift; @@ -167,6 +168,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { private final ConfigurationManager configurationManager; private final Thresholds thresholds; private final NonVolatileStorage storage; + private final DistributedSnapshotStore snapshotStore; private final StorageBackup backup; private final Recovery recovery; private final MaintenanceController maintenance; @@ -195,6 +197,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { ConfigurationManager configurationManager, Thresholds thresholds, NonVolatileStorage storage, + DistributedSnapshotStore snapshotStore, StorageBackup backup, Recovery recovery, CronJobManager cronJobManager, @@ -211,6 +214,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { this.configurationManager = requireNonNull(configurationManager); this.thresholds = requireNonNull(thresholds); this.storage = requireNonNull(storage); + this.snapshotStore = requireNonNull(snapshotStore); this.backup = requireNonNull(backup); this.recovery = requireNonNull(recovery); this.maintenance = requireNonNull(maintenance); @@ -635,7 +639,7 @@ class SchedulerThriftInterface implements AnnotatedAuroraAdmin { @Override public Response snapshot() { - storage.snapshot(); + snapshotStore.snapshot(); return ok(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java index 8cf6871..e82b637 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java +++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java @@ -36,11 +36,6 @@ public class FakeNonVolatileStorage implements NonVolatileStorage { } @Override - public void snapshot() throws StorageException { - // No-op. - } - - @Override public void start(Quiet initializationLogic) throws StorageException { // No-op. } http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java index c639ab6..aeb8685 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java +++ b/src/test/java/org/apache/aurora/scheduler/app/local/LocalSchedulerMain.java @@ -26,6 +26,7 @@ import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.util.Modules; +import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.scheduler.TierModule; import org.apache.aurora.scheduler.app.SchedulerMain; import org.apache.aurora.scheduler.app.local.simulator.ClusterSimulatorModule; @@ -82,7 +83,17 @@ public final class LocalSchedulerMain { protected void configure() { bind(Storage.class).to(Key.get(Storage.class, Storage.Volatile.class)); bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class); - bind(DistributedSnapshotStore.class).toInstance(snapshot -> { }); + bind(DistributedSnapshotStore.class).toInstance(new DistributedSnapshotStore() { + @Override + public void snapshot() throws Storage.StorageException { + // no-op + } + + @Override + public void snapshotWith(Snapshot snapshot) { + // no-op + } + }); } }; http://git-wip-us.apache.org/repos/asf/aurora/blob/cea43db9/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java index 7138d6b..09560f4 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/backup/RecoveryTest.java @@ -98,7 +98,7 @@ public class RecoveryTest extends EasyMockTest { Capture<MutateWork<Object, Exception>> transaction = createCapture(); expect(primaryStorage.write(capture(transaction))).andReturn(null); Capture<Snapshot> snapshot = createCapture(); - distributedStore.persist(capture(snapshot)); + distributedStore.snapshotWith(capture(snapshot)); shutDownNow.execute(); control.replay(); @@ -127,7 +127,7 @@ public class RecoveryTest extends EasyMockTest { Capture<MutateWork<Object, Exception>> transaction = createCapture(); expect(primaryStorage.write(capture(transaction))).andReturn(null); Capture<Snapshot> snapshot = createCapture(); - distributedStore.persist(capture(snapshot)); + distributedStore.snapshotWith(capture(snapshot)); shutDownNow.execute(); control.replay();