Repository: mesos Updated Branches: refs/heads/master b9b9035c0 -> 93bb63277
Fixed fetcher cache test race for resource offers when starting tasks and changed corresponding CHECK to EXPECT. Plus no more CHECK_READY inside launchTask(s)(), but we return a Try instead and follow call sites with EXPECT_SOME(task(s)). Review: https://reviews.apache.org/r/35438 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/93bb6327 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/93bb6327 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/93bb6327 Branch: refs/heads/master Commit: 93bb63277673b4422669b758f5fecd48afc49097 Parents: b9b9035 Author: Bernd Mathiske <be...@mesosphere.io> Authored: Mon Jun 22 16:46:49 2015 +0200 Committer: Bernd Mathiske <be...@mesosphere.io> Committed: Mon Jun 22 16:46:50 2015 +0200 ---------------------------------------------------------------------- src/tests/fetcher_cache_tests.cpp | 189 +++++++++++++++++++-------------- 1 file changed, 111 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/93bb6327/src/tests/fetcher_cache_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/fetcher_cache_tests.cpp b/src/tests/fetcher_cache_tests.cpp index 8bd5dd8..f29f319 100644 --- a/src/tests/fetcher_cache_tests.cpp +++ b/src/tests/fetcher_cache_tests.cpp @@ -136,9 +136,9 @@ protected: // recovery testing. void stopSlave(); - Task launchTask(const CommandInfo& commandInfo, const size_t taskIndex); + Try<Task> launchTask(const CommandInfo& commandInfo, const size_t taskIndex); - vector<Task> launchTasks(const vector<CommandInfo>& commandInfos); + Try<vector<Task>> launchTasks(const vector<CommandInfo>& commandInfos); // Waits until FetcherProcess::run() has been called for all tasks. void awaitFetchContention(); @@ -202,7 +202,7 @@ void FetcherCacheTest::SetUp() // "cover" is necessary, because we only add relevant mock actions // in launchTask() and launchTasks() AFTER starting the driver. EXPECT_CALL(scheduler, resourceOffers(driver, _)) - .WillRepeatedly(Return()); + .WillRepeatedly(DeclineOffers()); } @@ -346,7 +346,7 @@ ACTION_P(PushTaskStatus, taskStatusQueue) // awaitFinished(task), where `task` is the return value of this method. // TODO(bernd-mesos): Make this abstraction as generic and generally // available for all testing as possible. -FetcherCacheTest::Task FetcherCacheTest::launchTask( +Try<FetcherCacheTest::Task> FetcherCacheTest::launchTask( const CommandInfo& commandInfo, const size_t taskIndex) { @@ -355,8 +355,15 @@ FetcherCacheTest::Task FetcherCacheTest::launchTask( .WillOnce(FutureArg<1>(&offers)) .WillRepeatedly(DeclineOffers()); + // The default timeout in AWAIT_READY is 15 seconds, + // so we use that amount here. + // TODO(bernd-mesos): Make this a symbolic constant in "gtest.hpp" + // that we can reference here. offers.await(Seconds(15)); - CHECK_READY(offers) << "Failed to wait for resource offers"; + if (!offers.isReady()) { + return Error("Failed to wait for resource offers: " + + (offers.isFailed() ? offers.failure() : "discarded")); + } EXPECT_NE(0u, offers.get().size()); const Offer offer = offers.get()[0]; @@ -439,7 +446,7 @@ ACTION_P(SatisfyOne, promises) // method. // TODO(bernd-mesos): Make this abstraction as generic and generally // available for all testing as possible. -vector<FetcherCacheTest::Task> FetcherCacheTest::launchTasks( +Try<vector<FetcherCacheTest::Task>> FetcherCacheTest::launchTasks( const vector<CommandInfo>& commandInfos) { vector<FetcherCacheTest::Task> result; @@ -457,8 +464,15 @@ vector<FetcherCacheTest::Task> FetcherCacheTest::launchTasks( .WillOnce(FutureArg<1>(&offers)) .WillRepeatedly(DeclineOffers()); + // The default timeout in AWAIT_READY is 15 seconds, + // so we use that amount here. + // TODO(bernd-mesos): Make this a symbolic constant in "gtest.hpp" + // that we can reference here. offers.await(Seconds(15)); - CHECK_READY(offers) << "Failed to wait for resource offers"; + if (!offers.isReady()) { + return Error("Failed to wait for resource offers: " + + (offers.isFailed() ? offers.failure() : "discarded")); + } EXPECT_NE(0u, offers.get().size()); const Offer offer = offers.get()[0]; @@ -545,15 +559,16 @@ TEST_F(FetcherCacheTest, LocalUncached) commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i)); commandInfo.add_uris()->CopyFrom(uri); - const Task task = launchTask(commandInfo, i); + const Try<Task> task = launchTask(commandInfo, i); + EXPECT_SOME(task); - AWAIT_READY(awaitFinished(task)); + AWAIT_READY(awaitFinished(task.get())); EXPECT_EQ(0u, fetcherProcess->cacheSize()); EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); EXPECT_EQ(0u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); - const string path = path::join(task.runDirectory.value, COMMAND_NAME); + const string path = path::join(task.get().runDirectory.value, COMMAND_NAME); EXPECT_TRUE(isExecutable(path)); EXPECT_TRUE(os::exists(path + taskName(i))); } @@ -578,11 +593,12 @@ TEST_F(FetcherCacheTest, LocalCached) commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i)); commandInfo.add_uris()->CopyFrom(uri); - const Task task = launchTask(commandInfo, i); + const Try<Task> task = launchTask(commandInfo, i); + EXPECT_SOME(task); - AWAIT_READY(awaitFinished(task)); + AWAIT_READY(awaitFinished(task.get())); - const string path = path::join(task.runDirectory.value, COMMAND_NAME); + const string path = path::join(task.get().runDirectory.value, COMMAND_NAME); EXPECT_TRUE(isExecutable(path)); EXPECT_TRUE(os::exists(path + taskName(i))); @@ -621,11 +637,12 @@ TEST_F(FetcherCacheTest, CachedFallback) Invoke(fetcherProcess, &MockFetcherProcess::unmocked_run))); - const Task task = launchTask(commandInfo, 0); + const Try<Task> task = launchTask(commandInfo, 0); + EXPECT_SOME(task); - AWAIT_READY(awaitFinished(task)); + AWAIT_READY(awaitFinished(task.get())); - const string path = path::join(task.runDirectory.value, COMMAND_NAME); + const string path = path::join(task.get().runDirectory.value, COMMAND_NAME); EXPECT_TRUE(isExecutable(path)); EXPECT_TRUE(os::exists(path + taskName(0))); @@ -657,17 +674,18 @@ TEST_F(FetcherCacheTest, LocalUncachedExtract) commandInfo.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(i)); commandInfo.add_uris()->CopyFrom(uri); - const Task task = launchTask(commandInfo, i); + const Try<Task> task = launchTask(commandInfo, i); + EXPECT_SOME(task); - AWAIT_READY(awaitFinished(task)); + AWAIT_READY(awaitFinished(task.get())); EXPECT_TRUE(os::exists( - path::join(task.runDirectory.value, ARCHIVE_NAME))); + path::join(task.get().runDirectory.value, ARCHIVE_NAME))); EXPECT_FALSE(isExecutable( - path::join(task.runDirectory.value, ARCHIVE_NAME))); + path::join(task.get().runDirectory.value, ARCHIVE_NAME))); const string path = - path::join(task.runDirectory.value, ARCHIVED_COMMAND_NAME); + path::join(task.get().runDirectory.value, ARCHIVED_COMMAND_NAME); EXPECT_TRUE(isExecutable(path)); EXPECT_TRUE(os::exists(path + taskName(i))); @@ -694,15 +712,16 @@ TEST_F(FetcherCacheTest, LocalCachedExtract) commandInfo.set_value("./" + ARCHIVED_COMMAND_NAME + " " + taskName(i)); commandInfo.add_uris()->CopyFrom(uri); - const Task task = launchTask(commandInfo, i); + const Try<Task> task = launchTask(commandInfo, i); + EXPECT_SOME(task); - AWAIT_READY(awaitFinished(task)); + AWAIT_READY(awaitFinished(task.get())); EXPECT_FALSE(os::exists( - path::join(task.runDirectory.value, ARCHIVE_NAME))); + path::join(task.get().runDirectory.value, ARCHIVE_NAME))); const string path = - path::join(task.runDirectory.value, ARCHIVED_COMMAND_NAME); + path::join(task.get().runDirectory.value, ARCHIVED_COMMAND_NAME); EXPECT_TRUE(isExecutable(path)); EXPECT_TRUE(os::exists(path + taskName(i))); @@ -834,12 +853,13 @@ TEST_F(FetcherCacheHttpTest, HttpCachedSerialized) commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i)); commandInfo.add_uris()->CopyFrom(uri); - const Task task = launchTask(commandInfo, i); + const Try<Task> task = launchTask(commandInfo, i); + EXPECT_SOME(task); - AWAIT_READY(awaitFinished(task)); + AWAIT_READY(awaitFinished(task.get())); const string path = - path::join(task.runDirectory.value, COMMAND_NAME); + path::join(task.get().runDirectory.value, COMMAND_NAME); EXPECT_TRUE(isExecutable(path)); EXPECT_TRUE(os::exists(path + taskName(i))); @@ -891,9 +911,10 @@ TEST_F(FetcherCacheHttpTest, HttpCachedConcurrent) commandInfos.push_back(commandInfo); } - vector<Task> tasks = launchTasks(commandInfos); + Try<vector<Task>> tasks = launchTasks(commandInfos); + EXPECT_SOME(tasks); - CHECK_EQ(countTasks, tasks.size()); + CHECK_EQ(countTasks, tasks.get().size()); // Given pausing the HTTP server, this proves that fetch contention // has happened. All tasks have passed the point where it occurs, @@ -903,25 +924,28 @@ TEST_F(FetcherCacheHttpTest, HttpCachedConcurrent) // Now let the tasks run. httpServer->resume(); - AWAIT_READY(awaitFinished(tasks)); + AWAIT_READY(awaitFinished(tasks.get())); EXPECT_EQ(1u, fetcherProcess->cacheSize()); EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); - // command content-length requests: 1 - // command downloads: 1 - // archive downloads: 2 + // HTTP requests regarding the archive asset as follows. Archive + // "content-length" requests: 1, archive file downloads: 2. EXPECT_EQ(2u, httpServer->countCommandRequests); + + // HTTP requests regarding the command asset as follows. Command + // "content-length" requests: 0, command file downloads: 2. EXPECT_EQ(2u, httpServer->countArchiveRequests); for (size_t i = 0; i < countTasks; i++) { EXPECT_EQ(i % 2 == 1, os::exists( - path::join(tasks[i].runDirectory.value, ARCHIVE_NAME))); + path::join(tasks.get()[i].runDirectory.value, ARCHIVE_NAME))); EXPECT_TRUE(isExecutable( - path::join(tasks[i].runDirectory.value, COMMAND_NAME))); + path::join(tasks.get()[i].runDirectory.value, COMMAND_NAME))); EXPECT_TRUE(os::exists( - path::join(tasks[i].runDirectory.value, COMMAND_NAME + taskName(i)))); + path::join(tasks.get()[i].runDirectory.value, + COMMAND_NAME + taskName(i)))); } } @@ -994,9 +1018,10 @@ TEST_F(FetcherCacheHttpTest, HttpMixed) commandInfo2.add_uris()->CopyFrom(uri21); commandInfos.push_back(commandInfo2); - vector<Task> tasks = launchTasks(commandInfos); + Try<vector<Task>> tasks = launchTasks(commandInfos); + EXPECT_SOME(tasks); - CHECK_EQ(3u, tasks.size()); + CHECK_EQ(3u, tasks.get().size()); // Given pausing the HTTP server, this proves that fetch contention // has happened. All tasks have passed the point where it occurs, @@ -1006,54 +1031,56 @@ TEST_F(FetcherCacheHttpTest, HttpMixed) // Now let the tasks run. httpServer->resume(); - AWAIT_READY(awaitFinished(tasks)); + AWAIT_READY(awaitFinished(tasks.get())); EXPECT_EQ(1u, fetcherProcess->cacheSize()); EXPECT_SOME(fetcherProcess->cacheFiles(slaveId, flags)); EXPECT_EQ(1u, fetcherProcess->cacheFiles(slaveId, flags).get().size()); - // command content-length requests: 0 - // command downloads: 3 - // archive content-length requests: 1 - // archive downloads: 2 + // HTTP requests regarding the command asset as follows. Command + // "content-length" requests: 0, command file downloads: 3. EXPECT_EQ(3u, httpServer->countCommandRequests); + + // HTTP requests regarding the archive asset as follows. Archive + // "content-length" requests: 1, archive file downloads: 2. EXPECT_EQ(3u, httpServer->countArchiveRequests); // Task 0. EXPECT_FALSE(isExecutable( - path::join(tasks[0].runDirectory.value, ARCHIVE_NAME))); + path::join(tasks.get()[0].runDirectory.value, ARCHIVE_NAME))); EXPECT_FALSE(os::exists( - path::join(tasks[0].runDirectory.value, ARCHIVED_COMMAND_NAME))); + path::join(tasks.get()[0].runDirectory.value, ARCHIVED_COMMAND_NAME))); EXPECT_TRUE(isExecutable( - path::join(tasks[0].runDirectory.value, COMMAND_NAME))); + path::join(tasks.get()[0].runDirectory.value, COMMAND_NAME))); EXPECT_TRUE(os::exists( - path::join(tasks[0].runDirectory.value, COMMAND_NAME + taskName(0)))); + path::join(tasks.get()[0].runDirectory.value, + COMMAND_NAME + taskName(0)))); // Task 1. EXPECT_FALSE(isExecutable( - path::join(tasks[1].runDirectory.value, ARCHIVE_NAME))); + path::join(tasks.get()[1].runDirectory.value, ARCHIVE_NAME))); EXPECT_TRUE(isExecutable( - path::join(tasks[1].runDirectory.value, ARCHIVED_COMMAND_NAME))); + path::join(tasks.get()[1].runDirectory.value, ARCHIVED_COMMAND_NAME))); EXPECT_TRUE(os::exists(path::join( - tasks[1].runDirectory.value, ARCHIVED_COMMAND_NAME + taskName(1)))); + tasks.get()[1].runDirectory.value, ARCHIVED_COMMAND_NAME + taskName(1)))); EXPECT_FALSE(isExecutable( - path::join(tasks[1].runDirectory.value, COMMAND_NAME))); + path::join(tasks.get()[1].runDirectory.value, COMMAND_NAME))); // Task 2. EXPECT_FALSE(os::exists( - path::join(tasks[2].runDirectory.value, ARCHIVE_NAME))); + path::join(tasks.get()[2].runDirectory.value, ARCHIVE_NAME))); EXPECT_TRUE(isExecutable( - path::join(tasks[2].runDirectory.value, ARCHIVED_COMMAND_NAME))); + path::join(tasks.get()[2].runDirectory.value, ARCHIVED_COMMAND_NAME))); EXPECT_TRUE(os::exists(path::join( - tasks[2].runDirectory.value, ARCHIVED_COMMAND_NAME + taskName(2)))); + tasks.get()[2].runDirectory.value, ARCHIVED_COMMAND_NAME + taskName(2)))); EXPECT_FALSE(isExecutable( - path::join(tasks[2].runDirectory.value, COMMAND_NAME))); + path::join(tasks.get()[2].runDirectory.value, COMMAND_NAME))); } @@ -1074,11 +1101,12 @@ TEST_F(FetcherCacheHttpTest, HttpCachedRecovery) commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i)); commandInfo.add_uris()->CopyFrom(uri); - const Task task = launchTask(commandInfo, i); + const Try<Task> task = launchTask(commandInfo, i); + EXPECT_SOME(task); - AWAIT_READY(awaitFinished(task)); + AWAIT_READY(awaitFinished(task.get())); - const string path = path::join(task.runDirectory.value, COMMAND_NAME); + const string path = path::join(task.get().runDirectory.value, COMMAND_NAME); EXPECT_TRUE(isExecutable(path)); EXPECT_TRUE(os::exists(path + taskName(i))); @@ -1131,12 +1159,13 @@ TEST_F(FetcherCacheHttpTest, HttpCachedRecovery) commandInfo.set_value("./" + COMMAND_NAME + " " + taskName(i)); commandInfo.add_uris()->CopyFrom(uri); - const Task task = launchTask(commandInfo, i); + const Try<Task> task = launchTask(commandInfo, i); + EXPECT_SOME(task); - AWAIT_READY(awaitFinished(task)); + AWAIT_READY(awaitFinished(task.get())); const string path = - path::join(task.runDirectory.value, COMMAND_NAME); + path::join(task.get().runDirectory.value, COMMAND_NAME); EXPECT_TRUE(isExecutable(path)); EXPECT_TRUE(os::exists(path + taskName(i))); @@ -1180,15 +1209,16 @@ TEST_F(FetcherCacheTest, SimpleEviction) commandInfo.set_value("./" + command); commandInfo.add_uris()->CopyFrom(uri); - const Task task = launchTask(commandInfo, i); + const Try<Task> task = launchTask(commandInfo, i); + EXPECT_SOME(task); - AWAIT_READY(awaitFinished(task)); + AWAIT_READY(awaitFinished(task.get())); // Check that the task succeeded. EXPECT_TRUE(isExecutable( - path::join(task.runDirectory.value, commandFilename))); + path::join(task.get().runDirectory.value, commandFilename))); EXPECT_TRUE(os::exists( - path::join(task.runDirectory.value, COMMAND_NAME + taskName(i)))); + path::join(task.get().runDirectory.value, COMMAND_NAME + taskName(i)))); if (i < countCacheEntries) { EXPECT_EQ(i + 1, fetcherProcess->cacheSize()); @@ -1257,15 +1287,16 @@ TEST_F(FetcherCacheTest, FallbackFromEviction) commandInfo0.set_value("./" + command0); commandInfo0.add_uris()->CopyFrom(uri0); - const Task task0 = launchTask(commandInfo0, 0); + const Try<Task> task0 = launchTask(commandInfo0, 0); + EXPECT_SOME(task0) << task0.error(); - AWAIT_READY(awaitFinished(task0)); + AWAIT_READY(awaitFinished(task0.get())); // Check that the task succeeded. EXPECT_TRUE(isExecutable( - path::join(task0.runDirectory.value, commandFilename0))); + path::join(task0.get().runDirectory.value, commandFilename0))); EXPECT_TRUE(os::exists( - path::join(task0.runDirectory.value, COMMAND_NAME + taskName(0)))); + path::join(task0.get().runDirectory.value, COMMAND_NAME + taskName(0)))); AWAIT_READY(fetcherInfo0); @@ -1305,15 +1336,16 @@ TEST_F(FetcherCacheTest, FallbackFromEviction) commandInfo1.set_value("./" + command1); commandInfo1.add_uris()->CopyFrom(uri1); - const Task task1 = launchTask(commandInfo1, 1); + const Try<Task> task1 = launchTask(commandInfo1, 1); + EXPECT_SOME(task1) << task1.error(); - AWAIT_READY(awaitFinished(task1)); + AWAIT_READY(awaitFinished(task1.get())); // Check that the task succeeded. EXPECT_TRUE(isExecutable( - path::join(task1.runDirectory.value, commandFilename1))); + path::join(task1.get().runDirectory.value, commandFilename1))); EXPECT_TRUE(os::exists( - path::join(task1.runDirectory.value, COMMAND_NAME + taskName(1)))); + path::join(task1.get().runDirectory.value, COMMAND_NAME + taskName(1)))); AWAIT_READY(fetcherInfo1); @@ -1352,15 +1384,16 @@ TEST_F(FetcherCacheTest, FallbackFromEviction) commandInfo2.set_value("./" + command2); commandInfo2.add_uris()->CopyFrom(uri2); - const Task task2 = launchTask(commandInfo2, 2); + const Try<Task> task2 = launchTask(commandInfo2, 2); + EXPECT_SOME(task2) << task2.error(); - AWAIT_READY(awaitFinished(task2)); + AWAIT_READY(awaitFinished(task2.get())); // Check that the task succeeded. EXPECT_TRUE(isExecutable( - path::join(task2.runDirectory.value, commandFilename2))); + path::join(task2.get().runDirectory.value, commandFilename2))); EXPECT_TRUE(os::exists( - path::join(task2.runDirectory.value, COMMAND_NAME + taskName(2)))); + path::join(task2.get().runDirectory.value, COMMAND_NAME + taskName(2)))); AWAIT_READY(fetcherInfo2);