Remove the internal SQL database Reviewed at https://reviews.apache.org/r/63743/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/94276046 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/94276046 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/94276046 Branch: refs/heads/master Commit: 94276046606da4e1491ee3d0e0c29cd3649a82e6 Parents: e0624b2 Author: Bill Farner <wfar...@apache.org> Authored: Mon Nov 13 13:19:28 2017 -0800 Committer: Bill Farner <wfar...@apache.org> Committed: Mon Nov 13 13:19:28 2017 -0800 ---------------------------------------------------------------------- RELEASE-NOTES.md | 5 + .../thrift/org/apache/aurora/gen/storage.thrift | 2 +- build.gradle | 6 - .../aurora/benchmark/SchedulingBenchmarks.java | 74 ++- .../aurora/benchmark/TaskStoreBenchmarks.java | 26 - .../aurora/scheduler/async/AsyncModule.java | 25 +- .../aurora/scheduler/async/DelayExecutor.java | 33 - .../aurora/scheduler/async/GatedWorkQueue.java | 41 -- .../scheduler/async/GatingDelayExecutor.java | 98 --- .../aurora/scheduler/http/H2ConsoleModule.java | 63 -- .../http/api/security/HttpSecurityModule.java | 11 +- .../aurora/scheduler/offers/Deferment.java | 10 +- .../scheduler/pruning/TaskHistoryPruner.java | 12 +- .../scheduler/reconciliation/KillRetry.java | 11 +- .../scheduler/reconciliation/TaskTimeout.java | 14 +- .../aurora/scheduler/scheduling/TaskGroups.java | 9 +- .../scheduler/scheduling/TaskThrottler.java | 15 +- .../scheduler/storage/db/AttributeMapper.java | 83 --- .../scheduler/storage/db/CronJobMapper.java | 40 -- .../scheduler/storage/db/DbAttributeStore.java | 95 --- .../scheduler/storage/db/DbCronJobStore.java | 84 --- .../scheduler/storage/db/DbJobUpdateStore.java | 268 --------- .../scheduler/storage/db/DbLockStore.java | 81 --- .../aurora/scheduler/storage/db/DbModule.java | 364 ----------- .../scheduler/storage/db/DbQuotaStore.java | 82 --- .../scheduler/storage/db/DbSchedulerStore.java | 48 -- .../aurora/scheduler/storage/db/DbStorage.java | 242 -------- .../scheduler/storage/db/DbTaskStore.java | 207 ------- .../aurora/scheduler/storage/db/DbUtil.java | 76 --- .../scheduler/storage/db/EnumBackfill.java | 75 --- .../scheduler/storage/db/EnumValueMapper.java | 31 - .../scheduler/storage/db/FrameworkIdMapper.java | 26 - .../storage/db/GarbageCollectedTableMapper.java | 33 - .../scheduler/storage/db/InsertResult.java | 36 -- .../storage/db/InstrumentingInterceptor.java | 139 ----- .../db/JobInstanceUpdateEventMapper.java | 34 -- .../scheduler/storage/db/JobKeyMapper.java | 36 -- .../storage/db/JobUpdateDetailsMapper.java | 210 ------- .../storage/db/JobUpdateEventMapper.java | 34 -- .../scheduler/storage/db/LockKeyMapper.java | 49 -- .../aurora/scheduler/storage/db/LockMapper.java | 53 -- .../scheduler/storage/db/MigrationManager.java | 29 - .../storage/db/MigrationManagerImpl.java | 134 ----- .../scheduler/storage/db/MigrationMapper.java | 51 -- .../scheduler/storage/db/MyBatisCacheImpl.java | 119 ---- .../scheduler/storage/db/PruneVictim.java | 40 -- .../scheduler/storage/db/QuotaMapper.java | 79 --- .../storage/db/RowGarbageCollector.java | 99 --- .../scheduler/storage/db/TaskConfigManager.java | 161 ----- .../scheduler/storage/db/TaskConfigMapper.java | 210 ------- .../aurora/scheduler/storage/db/TaskMapper.java | 99 --- .../migration/V001_CreateAppcImagesTable.java | 46 -- .../migration/V002_CreateDockerImagesTable.java | 46 -- .../V003_CreateResourceTypesTable.java | 56 -- .../migration/V004_CreateTaskResourceTable.java | 74 --- .../V005_CreateQuotaResourceTable.java | 68 --- .../db/migration/V006_PopulateTierField.java | 51 -- .../V007_CreateMesosFetcherURIsTable.java | 46 -- .../V008_CreateUpdateMetadataTable.java | 45 -- .../V009_CreateContainerVolumesTable.java | 53 -- .../migration/V010_RemoveUniqueConstraint.java | 41 -- .../typehandlers/AbstractTEnumTypeHandler.java | 70 --- .../CronCollisionPolicyTypeHandler.java | 26 - .../JobUpdateActionTypeHandler.java | 26 - .../JobUpdateStatusTypeHandler.java | 26 - .../MaintenanceModeTypeHandler.java | 26 - .../db/typehandlers/ResourceTypeHandler.java | 26 - .../typehandlers/ScheduleStatusTypeHandler.java | 26 - .../storage/db/typehandlers/TypeHandlers.java | 41 -- .../db/typehandlers/VolumeModeTypeHandler.java | 23 - .../scheduler/storage/db/views/DBResource.java | 32 - .../storage/db/views/DBResourceAggregate.java | 56 -- .../scheduler/storage/db/views/DBSaveQuota.java | 29 - .../storage/db/views/DbAssginedPort.java | 30 - .../storage/db/views/DbAssignedTask.java | 48 -- .../storage/db/views/DbConstraint.java | 30 - .../scheduler/storage/db/views/DbContainer.java | 38 -- .../scheduler/storage/db/views/DbImage.java | 38 -- .../storage/db/views/DbInstanceTaskConfig.java | 33 - .../storage/db/views/DbJobConfiguration.java | 43 -- .../scheduler/storage/db/views/DbJobUpdate.java | 36 -- .../storage/db/views/DbJobUpdateDetails.java | 33 - .../db/views/DbJobUpdateInstructions.java | 45 -- .../storage/db/views/DbScheduledTask.java | 54 -- .../db/views/DbStoredJobUpdateDetails.java | 30 - .../storage/db/views/DbTaskConfig.java | 101 ---- .../storage/db/views/DbTaskConstraint.java | 45 -- .../scheduler/storage/db/views/LockRow.java | 46 -- .../db/views/MigrationChangelogEntry.java | 48 -- .../scheduler/storage/db/views/Pairs.java | 38 -- .../storage/log/SnapshotStoreImpl.java | 86 --- .../scheduler/storage/db/AttributeMapper.xml | 90 --- .../scheduler/storage/db/CronJobMapper.xml | 109 ---- .../scheduler/storage/db/EnumValueMapper.xml | 15 - .../scheduler/storage/db/FrameworkIdMapper.xml | 32 - .../storage/db/JobInstanceUpdateEventMapper.xml | 33 - .../scheduler/storage/db/JobKeyMapper.xml | 47 -- .../storage/db/JobUpdateDetailsMapper.xml | 598 ------------------- .../storage/db/JobUpdateEventMapper.xml | 35 -- .../aurora/scheduler/storage/db/LockMapper.xml | 83 --- .../scheduler/storage/db/MigrationMapper.xml | 55 -- .../aurora/scheduler/storage/db/QuotaMapper.xml | 91 --- .../scheduler/storage/db/TaskConfigMapper.xml | 460 -------------- .../aurora/scheduler/storage/db/TaskMapper.xml | 241 -------- .../aurora/scheduler/storage/db/schema.sql | 392 ------------ .../aurora/scheduler/async/AsyncModuleTest.java | 3 +- .../async/GatingDelayExecutorTest.java | 151 ----- .../scheduler/http/H2ConsoleModuleIT.java | 40 -- .../http/api/security/HttpSecurityIT.java | 48 -- .../scheduler/offers/OfferManagerImplTest.java | 6 +- .../pruning/TaskHistoryPrunerTest.java | 15 +- .../scheduler/reconciliation/KillRetryTest.java | 9 +- .../reconciliation/TaskTimeoutTest.java | 13 +- .../scheduler/scheduling/TaskGroupsTest.java | 6 +- .../scheduler/scheduling/TaskThrottlerTest.java | 14 +- .../scheduler/storage/backup/RecoveryTest.java | 2 - .../storage/db/AttributeStoreTest.java | 24 - .../scheduler/storage/db/CronJobStoreTest.java | 39 -- .../scheduler/storage/db/DbStorageTest.java | 117 ---- .../db/InstrumentingInterceptorTest.java | 162 ----- .../storage/db/JobUpdateStoreTest.java | 25 - .../scheduler/storage/db/LockStoreTest.java | 24 - .../storage/db/MigrationManagerImplIT.java | 157 ----- .../storage/db/MyBatisCacheImplTest.java | 52 -- .../scheduler/storage/db/QuotaStoreTest.java | 24 - .../storage/db/RowGarbageCollectorTest.java | 113 ---- .../storage/db/SchedulerStoreTest.java | 24 - .../scheduler/storage/db/TaskStoreTest.java | 39 -- .../db/testmigration/V001_TestMigration.java | 40 -- .../db/testmigration/V002_TestMigration2.java | 40 -- .../storage/log/SnapshotStoreImplIT.java | 93 +-- .../storage/mem/MemCronJobStoreTest.java | 10 - .../scheduler/storage/mem/MemTaskStoreTest.java | 10 - .../testing/FakeScheduledExecutor.java | 15 +- 134 files changed, 162 insertions(+), 9091 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 92f9c98..d653b79 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -5,6 +5,11 @@ - Updated to Mesos 1.4.0. +### Deprecations and removals: + +- Removed the ability to recover from SQL-based backups and snapshots. An 0.20.0 scheduler + will not be able to recover backups or replicated log data created prior to 0.19.0. + 0.19.0 ====== http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/api/src/main/thrift/org/apache/aurora/gen/storage.thrift ---------------------------------------------------------------------- diff --git a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift index ccb5825..74983ba 100644 --- a/api/src/main/thrift/org/apache/aurora/gen/storage.thrift +++ b/api/src/main/thrift/org/apache/aurora/gen/storage.thrift @@ -144,7 +144,7 @@ struct Snapshot { 8: set<QuotaConfiguration> quotaConfigurations 9: set<api.Lock> locks 10: set<StoredJobUpdateDetails> jobUpdateDetails - 11: list<string> dbScript + //11: removed //12: removed } http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index aa416b7..af11991 100644 --- a/build.gradle +++ b/build.gradle @@ -87,7 +87,6 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169 ext.jerseyRev = '1.19' ext.junitRev = '4.12' ext.logbackRev = '1.2.3' - ext.mybatisRev = '3.4.1' ext.nettyRev = '4.0.52.Final' ext.protobufRev = '3.3.0' ext.servletRev = '3.1.0' @@ -120,7 +119,6 @@ For more details, please see https://issues.apache.org/jira/browse/AURORA-1169 force "org.apache.zookeeper:zookeeper:${zookeeperRev}" force "org.hamcrest:hamcrest-core:1.3" force "org.slf4j:slf4j-api:${slf4jRev}" - force "org.mybatis:mybatis:${mybatisRev}" } } } @@ -373,7 +371,6 @@ dependencies { compile "com.google.inject:guice:${guiceRev}" compile "com.google.inject.extensions:guice-assistedinject:${guiceRev}" compile "com.google.protobuf:protobuf-java:${protobufRev}" - compile 'com.h2database:h2:1.4.196' compile 'com.hubspot.jackson:jackson-datatype-protobuf:0.9.3' compile "com.fasterxml.jackson.core:jackson-core:${jacksonRev}" compile "com.sun.jersey:jersey-core:${jerseyRev}" @@ -396,9 +393,6 @@ dependencies { compile "org.eclipse.jetty:jetty-server:${jettyDep}" compile "org.eclipse.jetty:jetty-servlet:${jettyDep}" compile "org.eclipse.jetty:jetty-servlets:${jettyDep}" - compile "org.mybatis:mybatis:${mybatisRev}" - compile 'org.mybatis:mybatis-guice:3.7' - compile 'org.mybatis:mybatis-migrations:3.2.0' compile 'org.quartz-scheduler:quartz:2.2.2' testCompile "com.sun.jersey:jersey-client:${jerseyRev}" http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java index 292bb29..1708a50 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/SchedulingBenchmarks.java @@ -15,6 +15,10 @@ package org.apache.aurora.benchmark; import java.util.List; import java.util.Set; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import javax.inject.Singleton; @@ -46,7 +50,6 @@ import org.apache.aurora.scheduler.HostOffer; import org.apache.aurora.scheduler.TaskIdGenerator; import org.apache.aurora.scheduler.TierModule; import org.apache.aurora.scheduler.async.AsyncModule; -import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.TaskTestUtil; import org.apache.aurora.scheduler.config.CliOptions; import org.apache.aurora.scheduler.config.CommandLine; @@ -138,20 +141,11 @@ public class SchedulingBenchmarks { new PrivateModule() { @Override protected void configure() { + // We use a no-op executor for async work, as this benchmark is focused on the // synchronous scheduling operations. - bind(DelayExecutor.class).annotatedWith(AsyncModule.AsyncExecutor.class) - .toInstance(new DelayExecutor() { - @Override - public void execute(Runnable work, Amount<Long, Time> minDelay) { - // No-op. - } - - @Override - public void execute(Runnable command) { - // No-op. - } - }); + bind(ScheduledExecutorService.class).annotatedWith(AsyncModule.AsyncExecutor.class) + .toInstance(new NoopExecutor()); bind(Deferment.class).to(Deferment.Noop.class); bind(OfferManager.class).to(OfferManager.OfferManagerImpl.class); bind(OfferManager.OfferManagerImpl.class).in(Singleton.class); @@ -400,4 +394,58 @@ public class SchedulingBenchmarks { return ImmutableSet.of("" + System.currentTimeMillis()); } } + + private static class NoopExecutor extends AbstractExecutorService + implements ScheduledExecutorService { + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { + return null; + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { + return null; + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + return null; + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit) { + return null; + } + + @Override + public void shutdown() { + } + + @Override + public List<Runnable> shutdownNow() { + return null; + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return false; + } + + @Override + public void execute(Runnable command) { + } + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java index 6f2f9f4..9ec9865 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java +++ b/src/jmh/java/org/apache/aurora/benchmark/TaskStoreBenchmarks.java @@ -27,7 +27,6 @@ import org.apache.aurora.common.util.testing.FakeClock; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.TaskStore; -import org.apache.aurora.scheduler.storage.db.DbUtil; import org.apache.aurora.scheduler.storage.entities.IScheduledTask; import org.apache.aurora.scheduler.storage.mem.MemStorageModule; import org.apache.aurora.scheduler.testing.FakeStatsProvider; @@ -111,29 +110,4 @@ public class TaskStoreBenchmarks { storage.read(store -> store.getTaskStore().fetchTasks(Query.unscoped()))); } } - - public static class DBFetchTasksBenchmark extends AbstractFetchTasksBenchmark { - @Setup(Level.Trial) - @Override - public void setUp() { - storage = DbUtil.createStorage(); - } - - @Setup(Level.Iteration) - public void setUpIteration() { - createTasks(numTasks); - } - - @TearDown(Level.Iteration) - public void tearDownIteration() { - deleteTasks(); - } - - @Benchmark - public int run() { - // Iterate through results in case the result is lazily computed. - return Iterables.size( - storage.read(store -> store.getTaskStore().fetchTasks(Query.unscoped()))); - } - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java index 68f7ddb..0166d41 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java @@ -81,20 +81,15 @@ public class AsyncModule extends AbstractModule { @Override protected void configure() { bind(ScheduledThreadPoolExecutor.class).toInstance(afterTransaction); - bind(ScheduledExecutorService.class).toInstance(afterTransaction); - - bind(GatingDelayExecutor.class).in(Singleton.class); - expose(GatingDelayExecutor.class); - bind(RegisterGauges.class).in(Singleton.class); expose(RegisterGauges.class); } }); SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class); - bind(Executor.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class); - bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class); - bind(GatedWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class); + bind(Executor.class).annotatedWith(AsyncExecutor.class).toInstance(afterTransaction); + bind(ScheduledExecutorService.class).annotatedWith(AsyncExecutor.class) + .toInstance(afterTransaction); } static class RegisterGauges extends AbstractIdleService { @@ -104,31 +99,19 @@ public class AsyncModule extends AbstractModule { @VisibleForTesting static final String ASYNC_TASKS_GAUGE = "async_tasks_completed"; - @VisibleForTesting - static final String DELAY_QUEUE_GAUGE = "delay_executor_queue_size"; - private final StatsProvider statsProvider; private final ScheduledThreadPoolExecutor executor; - private final GatingDelayExecutor delayExecutor; @Inject - RegisterGauges( - StatsProvider statsProvider, - ScheduledThreadPoolExecutor executor, - GatingDelayExecutor delayExecutor) { - + RegisterGauges(StatsProvider statsProvider, ScheduledThreadPoolExecutor executor) { this.statsProvider = requireNonNull(statsProvider); this.executor = requireNonNull(executor); - this.delayExecutor = requireNonNull(delayExecutor); } @Override protected void startUp() { statsProvider.makeGauge(TIMEOUT_QUEUE_GAUGE, () -> executor.getQueue().size()); statsProvider.makeGauge(ASYNC_TASKS_GAUGE, executor::getCompletedTaskCount); - // Using a lambda rather than method ref to sidestep a bug in PMD that makes it think - // delayExecutor is unused. - statsProvider.makeGauge(DELAY_QUEUE_GAUGE, delayExecutor::getQueueSize); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/async/DelayExecutor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/DelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/DelayExecutor.java deleted file mode 100644 index c851e5b..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/DelayExecutor.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.concurrent.Executor; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; - -/** - * An executor that supports executing work after a minimum time delay. - */ -public interface DelayExecutor extends Executor { - - /** - * Executes {@code work} after no less than {@code minDelay}. - * - * @param work Work to execute. - * @param minDelay Minimum amount of time to wait before executing the work. - */ - void execute(Runnable work, Amount<Long, Time> minDelay); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java b/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java deleted file mode 100644 index 7032271..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -/** - * A work queue that only executes pending work when flushed. - */ -public interface GatedWorkQueue { - - /** - * Closes the gate on the work queue for the duration of an operation. - * - * @param operation Operation to execute while keeping the gate closed. - * @param <T> Operation return type. - * @param <E> Operation exception type. - * @return The value returned by the {@code operation}. - * @throws E Exception thrown by the {@code operation}. - */ - <T, E extends Exception> T closeDuring(GatedOperation<T, E> operation) throws E; - - /** - * Operation prevents new items from being executed on the work queue. - * - * @param <T> Operation return type. - * @param <E> Operation exception type. - */ - interface GatedOperation<T, E extends Exception> { - T doWithGateClosed() throws E; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java deleted file mode 100644 index a7240ae..0000000 --- a/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.async; - -import java.util.Queue; -import java.util.concurrent.ScheduledExecutorService; - -import javax.inject.Inject; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; - -import static java.util.Objects.requireNonNull; - -/** - * An executor that may be temporarily gated with {@link #closeDuring(GatedOperation)}. When the - * executor is gated, newly-submitted work will be enqueued and executed once the gate is opened as - * a result of {@link #closeDuring(GatedOperation)} returning. - */ -class GatingDelayExecutor implements DelayExecutor, GatedWorkQueue { - - private final ScheduledExecutorService gated; - private final Queue<Runnable> queue = Lists.newLinkedList(); - - /** - * Creates a gating delay executor that will gate work from the provided executor. - * - * @param gated Delegate to execute work with when ungated. - */ - @Inject - GatingDelayExecutor(ScheduledExecutorService gated) { - this.gated = requireNonNull(gated); - } - - private final ThreadLocal<Boolean> isOpen = new ThreadLocal<Boolean>() { - @Override - protected Boolean initialValue() { - return true; - } - }; - - @Override - public <T, E extends Exception> T closeDuring(GatedOperation<T, E> operation) throws E { - boolean startedOpen = isOpen.get(); - isOpen.set(false); - - try { - return operation.doWithGateClosed(); - } finally { - if (startedOpen) { - isOpen.set(true); - flush(); - } - } - } - - synchronized int getQueueSize() { - return queue.size(); - } - - private synchronized void enqueue(Runnable work) { - if (isOpen.get()) { - work.run(); - } else { - queue.add(work); - } - } - - private synchronized void flush() { - for (Runnable work : Iterables.consumingIterable(queue)) { - work.run(); - } - } - - @Override - public synchronized void execute(Runnable command) { - enqueue(() -> gated.execute(command)); - } - - @Override - public synchronized void execute(Runnable work, Amount<Long, Time> minDelay) { - enqueue(() -> gated.schedule(work, minDelay.getValue(), minDelay.getUnit().getTimeUnit())); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/http/H2ConsoleModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/H2ConsoleModule.java b/src/main/java/org/apache/aurora/scheduler/http/H2ConsoleModule.java deleted file mode 100644 index ee10f47..0000000 --- a/src/main/java/org/apache/aurora/scheduler/http/H2ConsoleModule.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.http; - -import com.beust.jcommander.Parameter; -import com.beust.jcommander.Parameters; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import com.google.inject.servlet.ServletModule; - -import org.h2.server.web.WebServlet; - -/** - * Binding module for the H2 management console. - * <p> - * See: http://www.h2database.com/html/tutorial.html#tutorial_starting_h2_console - */ -public class H2ConsoleModule extends ServletModule { - public static final String H2_PATH = "/h2console"; - public static final String H2_PERM = "h2_management_console"; - - @Parameters(separators = "=") - public static class Options { - @Parameter( - names = "-enable_h2_console", - description = "Enable H2 DB management console.", - arity = 1) - public boolean enableH2Console = false; - } - - private final boolean enabled; - - public H2ConsoleModule(Options options) { - this(options.enableH2Console); - } - - @VisibleForTesting - public H2ConsoleModule(boolean enabled) { - this.enabled = enabled; - } - - @Override - protected void configureServlets() { - if (enabled) { - filter(H2_PATH, H2_PATH + "/*").through(LeaderRedirectFilter.class); - serve(H2_PATH, H2_PATH + "/*").with(new WebServlet(), ImmutableMap.of( - "webAllowOthers", "true", - "ifExists", "true" - )); - } - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityModule.java b/src/main/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityModule.java index 5229450..d81671c 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityModule.java +++ b/src/main/java/org/apache/aurora/scheduler/http/api/security/HttpSecurityModule.java @@ -54,8 +54,6 @@ import org.apache.shiro.subject.Subject; import static java.util.Objects.requireNonNull; -import static org.apache.aurora.scheduler.http.H2ConsoleModule.H2_PATH; -import static org.apache.aurora.scheduler.http.H2ConsoleModule.H2_PERM; import static org.apache.aurora.scheduler.http.api.ApiModule.API_PATH; import static org.apache.aurora.scheduler.spi.Permissions.Domain.THRIFT_AURORA_ADMIN; import static org.apache.shiro.guice.web.ShiroWebModule.guiceFilterModule; @@ -68,12 +66,9 @@ import static org.apache.shiro.web.filter.authc.AuthenticatingFilter.PERMISSIVE; * included with this package. */ public class HttpSecurityModule extends ServletModule { - public static final String HTTP_REALM_NAME = "Apache Aurora Scheduler"; + private static final String HTTP_REALM_NAME = "Apache Aurora Scheduler"; - private static final String H2_PATTERN = H2_PATH + "/**"; private static final String ALL_PATTERN = "/**"; - private static final Key<? extends Filter> K_STRICT = - Key.get(ShiroKerberosAuthenticationFilter.class); private static final Key<? extends Filter> K_PERMISSIVE = Key.get(ShiroKerberosPermissiveAuthenticationFilter.class); @@ -176,8 +171,6 @@ public class HttpSecurityModule extends ServletModule { } }); install(guiceFilterModule(API_PATH)); - install(guiceFilterModule(H2_PATH)); - install(guiceFilterModule(H2_PATH + "/*")); install(new ShiroWebModule(getServletContext()) { // Replace the ServletContainerSessionManager which causes subject.runAs(...) in a @@ -200,12 +193,10 @@ public class HttpSecurityModule extends ServletModule { // more specific pattern first. switch (mechanism) { case BASIC: - addFilterChain(H2_PATTERN, NO_SESSION_CREATION, AUTHC_BASIC, config(PERMS, H2_PERM)); addFilterChainWithAfterAuthFilter(config(AUTHC_BASIC, PERMISSIVE)); break; case NEGOTIATE: - addFilterChain(H2_PATTERN, NO_SESSION_CREATION, K_STRICT, config(PERMS, H2_PERM)); addFilterChainWithAfterAuthFilter(K_PERMISSIVE); break; http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java b/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java index f3ec886..90a4428 100644 --- a/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java +++ b/src/main/java/org/apache/aurora/scheduler/offers/Deferment.java @@ -13,6 +13,8 @@ */ package org.apache.aurora.scheduler.offers; +import java.util.concurrent.ScheduledExecutorService; + import javax.inject.Inject; import com.google.common.base.Supplier; @@ -20,7 +22,6 @@ import com.google.common.base.Supplier; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; -import org.apache.aurora.scheduler.async.DelayExecutor; import static java.util.Objects.requireNonNull; @@ -51,12 +52,12 @@ public interface Deferment { */ class DelayedDeferment implements Deferment { private final Supplier<Amount<Long, Time>> delay; - private final DelayExecutor executor; + private final ScheduledExecutorService executor; @Inject public DelayedDeferment( Supplier<Amount<Long, Time>> delay, - @AsyncExecutor DelayExecutor executor) { + @AsyncExecutor ScheduledExecutorService executor) { this.delay = requireNonNull(delay); this.executor = requireNonNull(executor); @@ -64,7 +65,8 @@ public interface Deferment { @Override public void defer(Runnable action) { - executor.execute(action, delay.get()); + Amount<Long, Time> actionDelay = delay.get(); + executor.schedule(action, actionDelay.getValue(), actionDelay.getUnit().getTimeUnit()); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java index f778494..3cafbc2 100644 --- a/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java +++ b/src/main/java/org/apache/aurora/scheduler/pruning/TaskHistoryPruner.java @@ -14,6 +14,8 @@ package org.apache.aurora.scheduler.pruning; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; @@ -34,7 +36,6 @@ import org.apache.aurora.gen.apiConstants; import org.apache.aurora.scheduler.BatchWorker; import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; -import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.state.StateManager; @@ -61,7 +62,7 @@ public class TaskHistoryPruner implements EventSubscriber { @VisibleForTesting static final String TASKS_PRUNED = "tasks_pruned"; - private final DelayExecutor executor; + private final ScheduledExecutorService executor; private final StateManager stateManager; private final Clock clock; private final HistoryPrunnerSettings settings; @@ -96,7 +97,7 @@ public class TaskHistoryPruner implements EventSubscriber { @Inject TaskHistoryPruner( - @AsyncExecutor DelayExecutor executor, + @AsyncExecutor ScheduledExecutorService executor, StateManager stateManager, Clock clock, HistoryPrunnerSettings settings, @@ -161,7 +162,7 @@ public class TaskHistoryPruner implements EventSubscriber { LOG.debug("Prune task {} in {} ms.", taskId, timeRemaining); - executor.execute( + executor.schedule( shutdownOnError( lifecycle, LOG, @@ -170,7 +171,8 @@ public class TaskHistoryPruner implements EventSubscriber { LOG.info("Pruning expired inactive task " + taskId); deleteTasks(ImmutableSet.of(taskId)); }), - Amount.of(timeRemaining, Time.MILLISECONDS)); + timeRemaining, + TimeUnit.MILLISECONDS); executor.execute( shutdownOnError( http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java index 31afa7f..53a3a53 100644 --- a/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/KillRetry.java @@ -13,6 +13,8 @@ */ package org.apache.aurora.scheduler.reconciliation; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; @@ -21,13 +23,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.eventbus.Subscribe; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.util.BackoffStrategy; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; -import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; @@ -50,7 +49,7 @@ public class KillRetry implements EventSubscriber { private final Driver driver; private final Storage storage; - private final DelayExecutor executor; + private final ScheduledExecutorService executor; private final BackoffStrategy backoffStrategy; private final AtomicLong killRetries; @@ -58,7 +57,7 @@ public class KillRetry implements EventSubscriber { KillRetry( Driver driver, Storage storage, - @AsyncExecutor DelayExecutor executor, + @AsyncExecutor ScheduledExecutorService executor, BackoffStrategy backoffStrategy, StatsProvider statsProvider) { @@ -86,7 +85,7 @@ public class KillRetry implements EventSubscriber { void tryLater() { retryInMs.set(backoffStrategy.calculateBackoffMs(retryInMs.get())); - executor.execute(this, Amount.of(retryInMs.get(), Time.MILLISECONDS)); + executor.schedule(this, retryInMs.get(), TimeUnit.MILLISECONDS); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java index 8e9a0d3..9910e77 100644 --- a/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java +++ b/src/main/java/org/apache/aurora/scheduler/reconciliation/TaskTimeout.java @@ -15,6 +15,8 @@ package org.apache.aurora.scheduler.reconciliation; import java.util.EnumSet; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; @@ -29,7 +31,6 @@ import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; -import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; import org.apache.aurora.scheduler.state.StateChangeResult; @@ -65,7 +66,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber { ScheduleStatus.KILLING, ScheduleStatus.DRAINING); - private final DelayExecutor executor; + private final ScheduledExecutorService executor; private final Storage storage; private final StateManager stateManager; private final Amount<Long, Time> timeout; @@ -73,7 +74,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber { @Inject TaskTimeout( - @AsyncExecutor DelayExecutor executor, + @AsyncExecutor ScheduledExecutorService executor, Storage storage, StateManager stateManager, Amount<Long, Time> timeout, @@ -140,7 +141,7 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber { LOG.debug("Retrying timeout of task {} in {}", taskId, NOT_STARTED_RETRY); // TODO(wfarner): This execution should not wait for a transaction, but a second executor // would be weird. - executor.execute(this, NOT_STARTED_RETRY); + executor.schedule(this, NOT_STARTED_RETRY.as(Time.MILLISECONDS), TimeUnit.MILLISECONDS); } } } @@ -148,9 +149,10 @@ class TaskTimeout extends AbstractIdleService implements EventSubscriber { @Subscribe public void recordStateChange(TaskStateChange change) { if (isTransient(change.getNewState())) { - executor.execute( + executor.schedule( new TimedOutTaskHandler(change.getTaskId(), change.getNewState()), - timeout); + timeout.as(Time.MILLISECONDS), + TimeUnit.MILLISECONDS); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java index 2d3492d..b9987e4 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskGroups.java @@ -19,6 +19,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; @@ -39,7 +41,6 @@ import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.common.util.BackoffStrategy; import org.apache.aurora.scheduler.BatchWorker; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; -import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.TaskGroupKey; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; @@ -73,7 +74,7 @@ public class TaskGroups implements EventSubscriber { static final String SCHEDULE_ATTEMPTS_BLOCKS = "schedule_attempts_blocks"; private final ConcurrentMap<TaskGroupKey, TaskGroup> groups = Maps.newConcurrentMap(); - private final DelayExecutor executor; + private final ScheduledExecutorService executor; private final TaskGroupsSettings settings; private final TaskScheduler taskScheduler; private final RescheduleCalculator rescheduleCalculator; @@ -134,7 +135,7 @@ public class TaskGroups implements EventSubscriber { @VisibleForTesting @Inject public TaskGroups( - @AsyncExecutor DelayExecutor executor, + @AsyncExecutor ScheduledExecutorService executor, TaskGroupsSettings settings, TaskScheduler taskScheduler, RescheduleCalculator rescheduleCalculator, @@ -153,7 +154,7 @@ public class TaskGroups implements EventSubscriber { // Avoid check-then-act by holding the intrinsic lock. If not done atomically, we could // remove a group while a task is being added to it. if (group.hasMore()) { - executor.execute(evaluate, Amount.of(group.getPenaltyMs(), Time.MILLISECONDS)); + executor.schedule(evaluate, group.getPenaltyMs(), TimeUnit.MILLISECONDS); } else { groups.remove(group.getKey()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java index 867c9bd..24692b0 100644 --- a/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java +++ b/src/main/java/org/apache/aurora/scheduler/scheduling/TaskThrottler.java @@ -13,19 +13,19 @@ */ package org.apache.aurora.scheduler.scheduling; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import javax.inject.Inject; import com.google.common.base.Optional; import com.google.common.eventbus.Subscribe; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; import org.apache.aurora.common.stats.SlidingStats; import org.apache.aurora.common.util.Clock; import org.apache.aurora.scheduler.BatchWorker; import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; -import org.apache.aurora.scheduler.async.DelayExecutor; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; @@ -46,7 +46,7 @@ class TaskThrottler implements EventSubscriber { private final RescheduleCalculator rescheduleCalculator; private final Clock clock; - private final DelayExecutor executor; + private final ScheduledExecutorService executor; private final StateManager stateManager; private final TaskEventBatchWorker batchWorker; @@ -56,7 +56,7 @@ class TaskThrottler implements EventSubscriber { TaskThrottler( RescheduleCalculator rescheduleCalculator, Clock clock, - @AsyncExecutor DelayExecutor executor, + @AsyncExecutor ScheduledExecutorService executor, StateManager stateManager, TaskEventBatchWorker batchWorker) { @@ -74,7 +74,7 @@ class TaskThrottler implements EventSubscriber { + rescheduleCalculator.getFlappingPenaltyMs(stateChange.getTask()); long delayMs = Math.max(0, readyAtMs - clock.nowMillis()); throttleStats.accumulate(delayMs); - executor.execute(() -> + executor.schedule((Runnable) () -> batchWorker.execute(storeProvider -> { stateManager.changeState( storeProvider, @@ -84,7 +84,8 @@ class TaskThrottler implements EventSubscriber { Optional.absent()); return BatchWorker.NO_RESULT; }), - Amount.of(delayMs, Time.MILLISECONDS)); + delayMs, + TimeUnit.MILLISECONDS); } } } http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/AttributeMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/AttributeMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/AttributeMapper.java deleted file mode 100644 index a454887..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/AttributeMapper.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.db; - -import java.util.List; - -import javax.annotation.Nullable; - -import org.apache.aurora.gen.HostAttributes; -import org.apache.aurora.gen.MaintenanceMode; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; -import org.apache.ibatis.annotations.Param; - -/** - * MyBatis mapper interface for Attribute.xml. - */ -interface AttributeMapper { - /** - * Saves attributes for a host, based on {@link IHostAttributes#getHost()}. - * - * @param attributes Host attributes to save. - */ - void insert(IHostAttributes attributes); - - /** - * Deletes all attributes and attribute values associated with a slave. - * - * @param host Host to delete associated values from. - */ - void deleteAttributeValues(@Param("host") String host); - - /** - * Updates the mode and slave ID associated with a host. - * - * @param host Host to update. - * @param mode New host maintenance mode. - * @param slaveId New host slave ID. - */ - void updateHostModeAndSlaveId( - @Param("host") String host, - @Param("mode") MaintenanceMode mode, - @Param("slaveId") String slaveId); - - /** - * Inserts values in {@link IHostAttributes#getAttributes()}, associating them with - * {@link IHostAttributes#getSlaveId()}. - * - * @param attributes Attributes containing values to insert. - */ - void insertAttributeValues(IHostAttributes attributes); - - /** - * Retrieves the host attributes associated with a host. - * - * @param host Host to fetch attributes for. - * @return Attributes associated with {@code host}, or {@code null} if no association exists. - */ - @Nullable - HostAttributes select(@Param("host") String host); - - /** - * Retrieves all stored host attributes. - * - * @return All host attributes. - */ - List<HostAttributes> selectAll(); - - /** - * Deletes all stored attributes and values. - */ - void truncate(); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/CronJobMapper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/CronJobMapper.java b/src/main/java/org/apache/aurora/scheduler/storage/db/CronJobMapper.java deleted file mode 100644 index b07928d..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/CronJobMapper.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.db; - -import java.util.List; - -import javax.annotation.Nullable; - -import org.apache.aurora.scheduler.storage.db.views.DbJobConfiguration; -import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; -import org.apache.aurora.scheduler.storage.entities.IJobKey; -import org.apache.ibatis.annotations.Param; - -/** - * MyBatis mapper for cron jobs. - */ -interface CronJobMapper { - - void merge(@Param("job") IJobConfiguration job, @Param("task_config_id") long taskConfigId); - - void delete(@Param("job") IJobKey job); - - void truncate(); - - List<DbJobConfiguration> selectAll(); - - @Nullable - DbJobConfiguration select(@Param("job") IJobKey job); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java deleted file mode 100644 index fee465b..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbAttributeStore.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.db; - -import java.util.Objects; -import java.util.Set; - -import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.google.inject.Inject; - -import org.apache.aurora.scheduler.storage.AttributeStore; -import org.apache.aurora.scheduler.storage.entities.IAttribute; -import org.apache.aurora.scheduler.storage.entities.IHostAttributes; - -import static com.google.common.base.Preconditions.checkArgument; - -import static org.apache.aurora.common.base.MorePreconditions.checkNotBlank; -import static org.apache.aurora.common.inject.TimedInterceptor.Timed; - -/** - * Attribute store backed by a relational database. - */ -class DbAttributeStore implements AttributeStore.Mutable { - - private final AttributeMapper mapper; - - @Inject - DbAttributeStore(AttributeMapper mapper) { - this.mapper = Objects.requireNonNull(mapper); - } - - @Override - public void deleteHostAttributes() { - mapper.truncate(); - } - - @Timed("attribute_store_save") - @Override - public boolean saveHostAttributes(IHostAttributes hostAttributes) { - checkNotBlank(hostAttributes.getHost()); - checkArgument(hostAttributes.isSetMode()); - - if (Iterables.any(hostAttributes.getAttributes(), EMPTY_VALUES)) { - throw new IllegalArgumentException( - "Host attributes contains empty values: " + hostAttributes); - } - - Optional<IHostAttributes> existing = getHostAttributes(hostAttributes.getHost()); - if (existing.equals(Optional.of(hostAttributes))) { - return false; - } else if (existing.isPresent()) { - mapper.updateHostModeAndSlaveId( - hostAttributes.getHost(), - hostAttributes.getMode(), - hostAttributes.getSlaveId()); - } else { - mapper.insert(hostAttributes); - } - - mapper.deleteAttributeValues(hostAttributes.getHost()); - if (!hostAttributes.getAttributes().isEmpty()) { - mapper.insertAttributeValues(hostAttributes); - } - - return true; - } - - private static final Predicate<IAttribute> EMPTY_VALUES = - attribute -> attribute.getValues().isEmpty(); - - @Timed("attribute_store_fetch_one") - @Override - public Optional<IHostAttributes> getHostAttributes(String host) { - return Optional.fromNullable(mapper.select(host)).transform(IHostAttributes::build); - } - - @Timed("attribute_store_fetch_all") - @Override - public Set<IHostAttributes> getHostAttributes() { - return IHostAttributes.setFromBuilders(mapper.selectAll()); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java deleted file mode 100644 index e48a982..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbCronJobStore.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.db; - -import javax.inject.Inject; - -import com.google.common.base.Optional; -import com.google.common.collect.FluentIterable; - -import org.apache.aurora.common.inject.TimedInterceptor.Timed; -import org.apache.aurora.scheduler.storage.CronJobStore; -import org.apache.aurora.scheduler.storage.db.views.DbJobConfiguration; -import org.apache.aurora.scheduler.storage.entities.IJobConfiguration; -import org.apache.aurora.scheduler.storage.entities.IJobKey; - -import static java.util.Objects.requireNonNull; - -/** - * Cron job store backed by a relational database. - */ -class DbCronJobStore implements CronJobStore.Mutable { - private final CronJobMapper cronJobMapper; - private final JobKeyMapper jobKeyMapper; - private final TaskConfigManager taskConfigManager; - - @Inject - DbCronJobStore( - CronJobMapper cronJobMapper, - JobKeyMapper jobKeyMapper, - TaskConfigManager taskConfigManager) { - - this.cronJobMapper = requireNonNull(cronJobMapper); - this.jobKeyMapper = requireNonNull(jobKeyMapper); - this.taskConfigManager = requireNonNull(taskConfigManager); - } - - @Timed("db_storage_cron_save_accepted_job") - @Override - public void saveAcceptedJob(IJobConfiguration jobConfig) { - requireNonNull(jobConfig); - jobKeyMapper.merge(jobConfig.getKey()); - cronJobMapper.merge(jobConfig, taskConfigManager.insert(jobConfig.getTaskConfig())); - } - - @Timed("db_storage_cron_remove_job") - @Override - public void removeJob(IJobKey jobKey) { - requireNonNull(jobKey); - cronJobMapper.delete(jobKey); - } - - @Timed("db_storage_cron_delete_jobs") - @Override - public void deleteJobs() { - cronJobMapper.truncate(); - } - - @Timed("db_storage_cron_fetch_jobs") - @Override - public Iterable<IJobConfiguration> fetchJobs() { - return FluentIterable.from(cronJobMapper.selectAll()) - .transform(DbJobConfiguration::toImmutable) - .toList(); - } - - @Timed("db_storage_cron_fetch_job") - @Override - public Optional<IJobConfiguration> fetchJob(IJobKey jobKey) { - requireNonNull(jobKey); - return Optional.fromNullable(cronJobMapper.select(jobKey)) - .transform(DbJobConfiguration::toImmutable); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java deleted file mode 100644 index af854da..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbJobUpdateStore.java +++ /dev/null @@ -1,268 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.db; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -import javax.inject.Inject; - -import com.google.common.base.Function; -import com.google.common.base.Optional; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.FluentIterable; -import com.google.common.collect.ImmutableSet; - -import org.apache.aurora.common.base.MorePreconditions; -import org.apache.aurora.common.stats.StatsProvider; -import org.apache.aurora.gen.JobUpdateAction; -import org.apache.aurora.gen.JobUpdateStatus; -import org.apache.aurora.gen.storage.StoredJobUpdateDetails; -import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.Util; -import org.apache.aurora.scheduler.storage.db.views.DbJobUpdate; -import org.apache.aurora.scheduler.storage.db.views.DbJobUpdateInstructions; -import org.apache.aurora.scheduler.storage.db.views.DbStoredJobUpdateDetails; -import org.apache.aurora.scheduler.storage.entities.IInstanceTaskConfig; -import org.apache.aurora.scheduler.storage.entities.IJobInstanceUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobUpdate; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateDetails; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateEvent; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateInstructions; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateKey; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateQuery; -import org.apache.aurora.scheduler.storage.entities.IJobUpdateSummary; -import org.apache.aurora.scheduler.storage.entities.IMetadata; -import org.apache.aurora.scheduler.storage.entities.IRange; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.common.inject.TimedInterceptor.Timed; - -/** - * A relational database-backed job update store. - */ -public class DbJobUpdateStore implements JobUpdateStore.Mutable { - - private final JobKeyMapper jobKeyMapper; - private final JobUpdateDetailsMapper detailsMapper; - private final JobUpdateEventMapper jobEventMapper; - private final JobInstanceUpdateEventMapper instanceEventMapper; - private final TaskConfigManager taskConfigManager; - private final LoadingCache<JobUpdateStatus, AtomicLong> jobUpdateEventStats; - private final LoadingCache<JobUpdateAction, AtomicLong> jobUpdateActionStats; - - @Inject - DbJobUpdateStore( - JobKeyMapper jobKeyMapper, - JobUpdateDetailsMapper detailsMapper, - JobUpdateEventMapper jobEventMapper, - JobInstanceUpdateEventMapper instanceEventMapper, - TaskConfigManager taskConfigManager, - StatsProvider statsProvider) { - - this.jobKeyMapper = requireNonNull(jobKeyMapper); - this.detailsMapper = requireNonNull(detailsMapper); - this.jobEventMapper = requireNonNull(jobEventMapper); - this.instanceEventMapper = requireNonNull(instanceEventMapper); - this.taskConfigManager = requireNonNull(taskConfigManager); - this.jobUpdateEventStats = CacheBuilder.newBuilder() - .build(new CacheLoader<JobUpdateStatus, AtomicLong>() { - @Override - public AtomicLong load(JobUpdateStatus status) { - return statsProvider.makeCounter(Util.jobUpdateStatusStatName(status)); - } - }); - for (JobUpdateStatus status : JobUpdateStatus.values()) { - jobUpdateEventStats.getUnchecked(status).get(); - } - this.jobUpdateActionStats = CacheBuilder.newBuilder() - .build(new CacheLoader<JobUpdateAction, AtomicLong>() { - @Override - public AtomicLong load(JobUpdateAction action) { - return statsProvider.makeCounter(Util.jobUpdateActionStatName(action)); - } - }); - for (JobUpdateAction action : JobUpdateAction.values()) { - jobUpdateActionStats.getUnchecked(action).get(); - } - } - - @Timed("job_update_store_save_update") - @Override - public void saveJobUpdate(IJobUpdate update, Optional<String> lockToken) { - requireNonNull(update); - if (!update.getInstructions().isSetDesiredState() - && update.getInstructions().getInitialState().isEmpty()) { - throw new IllegalArgumentException( - "Missing both initial and desired states. At least one is required."); - } - - IJobUpdateKey key = update.getSummary().getKey(); - jobKeyMapper.merge(key.getJob()); - detailsMapper.insert(update.newBuilder()); - - if (lockToken.isPresent()) { - detailsMapper.insertLockToken(key, lockToken.get()); - } - - if (!update.getSummary().getMetadata().isEmpty()) { - detailsMapper.insertJobUpdateMetadata( - key, - IMetadata.toBuildersSet(update.getSummary().getMetadata())); - } - - // Insert optional instance update overrides. - Set<IRange> instanceOverrides = - update.getInstructions().getSettings().getUpdateOnlyTheseInstances(); - - if (!instanceOverrides.isEmpty()) { - detailsMapper.insertInstanceOverrides(key, IRange.toBuildersSet(instanceOverrides)); - } - - // Insert desired state task config and instance mappings. - if (update.getInstructions().isSetDesiredState()) { - IInstanceTaskConfig desired = update.getInstructions().getDesiredState(); - detailsMapper.insertTaskConfig( - key, - taskConfigManager.insert(desired.getTask()), - true, - new InsertResult()); - - detailsMapper.insertDesiredInstances( - key, - IRange.toBuildersSet(MorePreconditions.checkNotBlank(desired.getInstances()))); - } - - // Insert initial state task configs and instance mappings. - if (!update.getInstructions().getInitialState().isEmpty()) { - for (IInstanceTaskConfig config : update.getInstructions().getInitialState()) { - InsertResult result = new InsertResult(); - detailsMapper.insertTaskConfig( - key, - taskConfigManager.insert(config.getTask()), - false, - result); - - detailsMapper.insertTaskConfigInstances( - result.getId(), - IRange.toBuildersSet(MorePreconditions.checkNotBlank(config.getInstances()))); - } - } - } - - @Timed("job_update_store_save_event") - @Override - public void saveJobUpdateEvent(IJobUpdateKey key, IJobUpdateEvent event) { - jobEventMapper.insert(key, event.newBuilder()); - jobUpdateEventStats.getUnchecked(event.getStatus()).incrementAndGet(); - } - - @Timed("job_update_store_save_instance_event") - @Override - public void saveJobInstanceUpdateEvent(IJobUpdateKey key, IJobInstanceUpdateEvent event) { - instanceEventMapper.insert(key, event.newBuilder()); - jobUpdateActionStats.getUnchecked(event.getAction()).incrementAndGet(); - } - - @Timed("job_update_store_delete_all") - @Override - public void deleteAllUpdatesAndEvents() { - detailsMapper.truncate(); - } - - private static final Function<PruneVictim, IJobUpdateKey> GET_UPDATE_KEY = - victim -> IJobUpdateKey.build(victim.getUpdate()); - - @Timed("job_update_store_prune_history") - @Override - public Set<IJobUpdateKey> pruneHistory(int perJobRetainCount, long historyPruneThresholdMs) { - ImmutableSet.Builder<IJobUpdateKey> pruned = ImmutableSet.builder(); - - Set<Long> jobKeyIdsToPrune = detailsMapper.selectJobKeysForPruning( - perJobRetainCount, - historyPruneThresholdMs); - - for (long jobKeyId : jobKeyIdsToPrune) { - Set<PruneVictim> pruneVictims = detailsMapper.selectPruneVictims( - jobKeyId, - perJobRetainCount, - historyPruneThresholdMs); - - detailsMapper.deleteCompletedUpdates( - FluentIterable.from(pruneVictims).transform(PruneVictim::getRowId).toSet()); - pruned.addAll(FluentIterable.from(pruneVictims).transform(GET_UPDATE_KEY)); - } - - return pruned.build(); - } - - @Timed("job_update_store_fetch_summaries") - @Override - public List<IJobUpdateSummary> fetchJobUpdateSummaries(IJobUpdateQuery query) { - return IJobUpdateSummary.listFromBuilders(detailsMapper.selectSummaries(query.newBuilder())); - } - - @Timed("job_update_store_fetch_details_list") - @Override - public List<IJobUpdateDetails> fetchJobUpdateDetails(IJobUpdateQuery query) { - return FluentIterable - .from(detailsMapper.selectDetailsList(query.newBuilder())) - .transform(DbStoredJobUpdateDetails::toThrift) - .transform(StoredJobUpdateDetails::getDetails) - .transform(IJobUpdateDetails::build) - .toList(); - } - - @Timed("job_update_store_fetch_details") - @Override - public Optional<IJobUpdateDetails> fetchJobUpdateDetails(final IJobUpdateKey key) { - return Optional.fromNullable(detailsMapper.selectDetails(key)) - .transform(DbStoredJobUpdateDetails::toThrift) - .transform(StoredJobUpdateDetails::getDetails) - .transform(IJobUpdateDetails::build); - } - - @Timed("job_update_store_fetch_update") - @Override - public Optional<IJobUpdate> fetchJobUpdate(IJobUpdateKey key) { - return Optional.fromNullable(detailsMapper.selectUpdate(key)) - .transform(DbJobUpdate::toImmutable); - } - - @Timed("job_update_store_fetch_instructions") - @Override - public Optional<IJobUpdateInstructions> fetchJobUpdateInstructions(IJobUpdateKey key) { - return Optional.fromNullable(detailsMapper.selectInstructions(key)) - .transform(DbJobUpdateInstructions::toImmutable); - } - - @Timed("job_update_store_fetch_all_details") - @Override - public Set<StoredJobUpdateDetails> fetchAllJobUpdateDetails() { - return FluentIterable.from(detailsMapper.selectAllDetails()) - .transform(DbStoredJobUpdateDetails::toThrift) - .toSet(); - } - - @Timed("job_update_store_fetch_instance_events") - @Override - public List<IJobInstanceUpdateEvent> fetchInstanceEvents(IJobUpdateKey key, int instanceId) { - return IJobInstanceUpdateEvent.listFromBuilders( - detailsMapper.selectInstanceUpdateEvents(key, instanceId)); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/DbLockStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbLockStore.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbLockStore.java deleted file mode 100644 index 9e28550..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbLockStore.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.db; - -import java.util.Optional; -import java.util.Set; -import java.util.function.Function; - -import com.google.inject.Inject; - -import org.apache.aurora.GuavaUtils; -import org.apache.aurora.scheduler.storage.LockStore; -import org.apache.aurora.scheduler.storage.db.views.LockRow; -import org.apache.aurora.scheduler.storage.entities.ILock; -import org.apache.aurora.scheduler.storage.entities.ILockKey; - -import static java.util.Objects.requireNonNull; - -import static org.apache.aurora.common.inject.TimedInterceptor.Timed; - -/** - * A relational database-backed lock store. - */ -class DbLockStore implements LockStore.Mutable { - - private final LockMapper mapper; - private final LockKeyMapper lockKeyMapper; - - @Inject - DbLockStore(LockMapper mapper, LockKeyMapper lockKeyMapper) { - this.mapper = requireNonNull(mapper); - this.lockKeyMapper = requireNonNull(lockKeyMapper); - } - - @Timed("lock_store_save_lock") - @Override - public void saveLock(ILock lock) { - lockKeyMapper.insert(lock.getKey()); - mapper.insert(lock.newBuilder()); - } - - @Timed("lock_store_remove_lock") - @Override - public void removeLock(ILockKey lockKey) { - mapper.delete(lockKey.newBuilder()); - } - - @Timed("lock_store_delete_locks") - @Override - public void deleteLocks() { - mapper.truncate(); - } - - @Timed("lock_store_fetch_locks") - @Override - public Set<ILock> fetchLocks() { - return mapper.selectAll().stream().map(TO_ROW).collect(GuavaUtils.toImmutableSet()); - } - - @Timed("lock_store_fetch_lock") - @Override - public Optional<ILock> fetchLock(ILockKey lockKey) { - return Optional.ofNullable(mapper.select(lockKey.newBuilder())).map(TO_ROW); - } - - /** - * LockRow converter to satisfy the ILock interface. - */ - private static final Function<LockRow, ILock> TO_ROW = input -> ILock.build(input.getLock()); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/94276046/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java deleted file mode 100644 index 7bd37f7..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java +++ /dev/null @@ -1,364 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.db; - -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import javax.inject.Singleton; - -import com.beust.jcommander.Parameter; -import com.beust.jcommander.Parameters; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.AbstractScheduledService; -import com.google.inject.AbstractModule; -import com.google.inject.Key; -import com.google.inject.Module; -import com.google.inject.PrivateModule; -import com.google.inject.TypeLiteral; -import com.google.inject.util.Modules; - -import org.apache.aurora.common.inject.Bindings.KeyFactory; -import org.apache.aurora.common.quantity.Amount; -import org.apache.aurora.common.quantity.Time; -import org.apache.aurora.scheduler.SchedulerServicesModule; -import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; -import org.apache.aurora.scheduler.async.GatedWorkQueue; -import org.apache.aurora.scheduler.config.types.TimeAmount; -import org.apache.aurora.scheduler.storage.AttributeStore; -import org.apache.aurora.scheduler.storage.CronJobStore; -import org.apache.aurora.scheduler.storage.JobUpdateStore; -import org.apache.aurora.scheduler.storage.LockStore; -import org.apache.aurora.scheduler.storage.QuotaStore; -import org.apache.aurora.scheduler.storage.SchedulerStore; -import org.apache.aurora.scheduler.storage.Storage; -import org.apache.aurora.scheduler.storage.TaskStore; -import org.apache.aurora.scheduler.storage.db.typehandlers.TypeHandlers; -import org.apache.ibatis.migration.JavaMigrationLoader; -import org.apache.ibatis.migration.MigrationLoader; -import org.apache.ibatis.session.AutoMappingBehavior; -import org.apache.ibatis.session.SqlSessionFactory; -import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory; -import org.mybatis.guice.MyBatisModule; -import org.mybatis.guice.datasource.builtin.PooledDataSourceProvider; -import org.mybatis.guice.datasource.helper.JdbcHelper; - -import static java.util.Objects.requireNonNull; - -import static com.google.inject.name.Names.bindProperties; - -/** - * Binding module for a relational database storage system. - */ -public final class DbModule extends PrivateModule { - - @Parameters(separators = "=") - public static class Options { - @Parameter(names = "-enable_db_metrics", - description = - "Whether to use MyBatis interceptor to measure the timing of intercepted Statements.", - arity = 1) - public boolean enableDbMetrics = true; - - @Parameter(names = "-slow_query_log_threshold", - description = "Log all queries that take at least this long to execute.") - public TimeAmount slowQueryLogThreshold = new TimeAmount(25, Time.MILLISECONDS); - - @Parameter(names = "-db_row_gc_interval", - description = "Interval on which to scan the database for unused row references.") - public TimeAmount dbRowGcInterval = new TimeAmount(2, Time.HOURS); - - // http://h2database.com/html/grammar.html#set_lock_timeout - @Parameter(names = "-db_lock_timeout", description = "H2 table lock timeout") - public TimeAmount h2LockTimeout = new TimeAmount(1, Time.MINUTES); - - @Parameter(names = "-db_max_active_connection_count", - description = "Max number of connections to use with database via MyBatis") - public int mybatisMaxActiveConnectionCount = -1; - - @Parameter(names = "-db_max_idle_connection_count", - description = "Max number of idle connections to the database via MyBatis") - public int mybatisMaxIdleConnectionCount = -1; - } - - private static final Set<Class<?>> MAPPER_CLASSES = ImmutableSet.<Class<?>>builder() - .add(AttributeMapper.class) - .add(CronJobMapper.class) - .add(EnumValueMapper.class) - .add(FrameworkIdMapper.class) - .add(JobInstanceUpdateEventMapper.class) - .add(JobKeyMapper.class) - .add(JobUpdateEventMapper.class) - .add(JobUpdateDetailsMapper.class) - .add(LockMapper.class) - .add(MigrationMapper.class) - .add(QuotaMapper.class) - .add(TaskConfigMapper.class) - .add(TaskMapper.class) - .build(); - - private final Options options; - private final KeyFactory keyFactory; - private final String jdbcSchema; - - private DbModule( - Options options, - KeyFactory keyFactory, - String dbName, - Map<String, String> jdbcUriArgs) { - - this.options = requireNonNull(options); - this.keyFactory = requireNonNull(keyFactory); - - Map<String, String> args = ImmutableMap.<String, String>builder() - .putAll(jdbcUriArgs) - // READ COMMITTED transaction isolation. More details here - // http://www.h2database.com/html/advanced.html?#transaction_isolation - .put("LOCK_MODE", "3") - // Send log messages from H2 to SLF4j - // See http://www.h2database.com/html/features.html#other_logging - .put("TRACE_LEVEL_FILE", "4") - // Enable Query Statistics - .put("QUERY_STATISTICS", "TRUE") - // Configure the lock timeout - .put("LOCK_TIMEOUT", options.h2LockTimeout.as(Time.MILLISECONDS).toString()) - .build(); - this.jdbcSchema = dbName + ";" + Joiner.on(";").withKeyValueSeparator("=").join(args); - } - - /** - * Creates a module that will prepare a volatile storage system suitable for use in a production - * environment. - * - * @param keyFactory Binding scope for the storage system. - * @return A new database module for production. - */ - public static Module productionModule(KeyFactory keyFactory, DbModule.Options options) { - return new DbModule( - options, - keyFactory, - "aurora", - ImmutableMap.of("DB_CLOSE_DELAY", "-1")); - } - - @VisibleForTesting - public static Module testModule(KeyFactory keyFactory) { - DbModule.Options options = new DbModule.Options(); - return new DbModule( - options, - keyFactory, - "testdb-" + UUID.randomUUID().toString(), - // A non-zero close delay is used here to avoid eager database cleanup in tests that - // make use of multiple threads. Since all test databases are separately scoped by the - // included UUID, multiple DB instances will overlap in time but they should be distinct - // in content. - ImmutableMap.of("DB_CLOSE_DELAY", "5")); - } - - /** - * Same as {@link #testModuleWithWorkQueue(KeyFactory)} but with default task store and - * key factory. - * - * @return A new database module for testing. - */ - @VisibleForTesting - public static Module testModule() { - return testModule(KeyFactory.PLAIN); - } - - /** - * Creates a module that will prepare a private in-memory database, using a specific task store - * implementation bound within the key factory and provided module. - * - * @param keyFactory Key factory to use. - * @return A new database module for testing. - */ - @VisibleForTesting - public static Module testModuleWithWorkQueue(KeyFactory keyFactory) { - return Modules.combine( - new AbstractModule() { - @Override - protected void configure() { - bind(GatedWorkQueue.class).annotatedWith(AsyncExecutor.class).toInstance( - new GatedWorkQueue() { - @Override - public <T, E extends Exception> T closeDuring( - GatedOperation<T, E> operation) throws E { - - return operation.doWithGateClosed(); - } - }); - } - }, - testModule(keyFactory) - ); - } - - /** - * Same as {@link #testModuleWithWorkQueue(KeyFactory)} but with default key factory. - * - * @return A new database module for testing. - */ - @VisibleForTesting - public static Module testModuleWithWorkQueue() { - return testModuleWithWorkQueue(KeyFactory.PLAIN); - } - - private <T> void bindStore(Class<T> binding, Class<? extends T> impl) { - bind(binding).to(impl); - bind(impl).in(Singleton.class); - Key<T> key = keyFactory.create(binding); - bind(key).to(impl); - expose(key); - } - - @Override - protected void configure() { - install(new MyBatisModule() { - @Override - protected void initialize() { - if (options.enableDbMetrics) { - addInterceptorClass(InstrumentingInterceptor.class); - } - - bindProperties(binder(), ImmutableMap.of("JDBC.schema", jdbcSchema)); - install(JdbcHelper.H2_IN_MEMORY_NAMED); - - // We have no plans to take advantage of multiple DB environments. This is a - // required property though, so we use an unnamed environment. - environmentId(""); - - bindTransactionFactoryType(JdbcTransactionFactory.class); - bindDataSourceProviderType(PooledDataSourceProvider.class); - addMapperClasses(MAPPER_CLASSES); - - // Full auto-mapping enables population of nested objects with minimal mapper configuration. - // Docs on settings can be found here: - // http://mybatis.github.io/mybatis-3/configuration.html#settings - autoMappingBehavior(AutoMappingBehavior.FULL); - - addTypeHandlersClasses(TypeHandlers.getAll()); - - bind(new TypeLiteral<Amount<Long, Time>>() { }) - .toInstance(options.slowQueryLogThreshold); - - // Enable a ping query which will prevent the use of invalid connections in the - // connection pool. - bindProperties(binder(), ImmutableMap.of("mybatis.pooled.pingEnabled", "true")); - bindProperties(binder(), ImmutableMap.of("mybatis.pooled.pingQuery", "SELECT 1;")); - - if (options.mybatisMaxActiveConnectionCount > 0) { - String val = String.valueOf(options.mybatisMaxActiveConnectionCount); - bindProperties(binder(), ImmutableMap.of("mybatis.pooled.maximumActiveConnections", val)); - } - - if (options.mybatisMaxIdleConnectionCount > 0) { - String val = String.valueOf(options.mybatisMaxIdleConnectionCount); - bindProperties(binder(), ImmutableMap.of("mybatis.pooled.maximumIdleConnections", val)); - } - - // Exposed for unit tests. - bind(TaskConfigManager.class); - expose(TaskConfigManager.class); - - // TODO(wfarner): Don't expose these bindings once the task store is directly bound here. - expose(TaskMapper.class); - expose(TaskConfigManager.class); - expose(JobKeyMapper.class); - } - }); - expose(keyFactory.create(CronJobStore.Mutable.class)); - expose(keyFactory.create(TaskStore.Mutable.class)); - - bindStore(AttributeStore.Mutable.class, DbAttributeStore.class); - bindStore(LockStore.Mutable.class, DbLockStore.class); - bindStore(QuotaStore.Mutable.class, DbQuotaStore.class); - bindStore(SchedulerStore.Mutable.class, DbSchedulerStore.class); - bindStore(JobUpdateStore.Mutable.class, DbJobUpdateStore.class); - bindStore(TaskStore.Mutable.class, DbTaskStore.class); - bindStore(CronJobStore.Mutable.class, DbCronJobStore.class); - - Key<Storage> storageKey = keyFactory.create(Storage.class); - bind(storageKey).to(DbStorage.class); - bind(DbStorage.class).in(Singleton.class); - expose(storageKey); - - bind(EnumBackfill.class).to(EnumBackfill.EnumBackfillImpl.class); - bind(EnumBackfill.EnumBackfillImpl.class).in(Singleton.class); - expose(EnumBackfill.class); - - expose(DbStorage.class); - expose(SqlSessionFactory.class); - expose(TaskMapper.class); - expose(TaskConfigMapper.class); - expose(JobKeyMapper.class); - } - - /** - * Module that sets up a periodic database garbage-collection routine. - */ - public static class GarbageCollectorModule extends AbstractModule { - - private final Options options; - - public GarbageCollectorModule(Options options) { - this.options = options; - } - - @Override - protected void configure() { - install(new PrivateModule() { - @Override - protected void configure() { - bind(RowGarbageCollector.class).in(Singleton.class); - bind(AbstractScheduledService.Scheduler.class).toInstance( - AbstractScheduledService.Scheduler.newFixedRateSchedule( - 0L, - options.dbRowGcInterval.getValue(), - options.dbRowGcInterval.getUnit().getTimeUnit())); - expose(RowGarbageCollector.class); - } - }); - SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()) - .to(RowGarbageCollector.class); - } - } - - public static class MigrationManagerModule extends PrivateModule { - private static final String MIGRATION_PACKAGE = - "org.apache.aurora.scheduler.storage.db.migration"; - - private final MigrationLoader migrationLoader; - - public MigrationManagerModule() { - this.migrationLoader = new JavaMigrationLoader(MIGRATION_PACKAGE); - } - - public MigrationManagerModule(MigrationLoader migrationLoader) { - this.migrationLoader = requireNonNull(migrationLoader); - } - - @Override - protected void configure() { - bind(MigrationLoader.class).toInstance(migrationLoader); - - bind(MigrationManager.class).to(MigrationManagerImpl.class); - expose(MigrationManager.class); - } - } -}