Repository: reef Updated Branches: refs/heads/master 08f096a27 -> 18c25fdcf
[REEF-1641] Remove batch id and change the way to request maser/slave evalautors Currently in IMRU, we have different spec for master vs mapper elevators. We use EvaluatorBatchId to distinguish which one we requested when receiving IAllocatedEvaluator. However, EvaluatorBatchId may not be supported in some environments such as HDInsight. So we need a more reliable way to decide which evaluator is for master. The change in this PR is to request the first evaluator for master. After receiving it, record it as master, then request the mappers. An assumption is that if master fails at any stage, we fail the system. * IMRU Driver and Evaluator Manager are updated * Existing test cases are updated * A new test case is added to simulate master evaluator failure during WaitingForEvaluators phase. JIRA: [REEF-1641](https://issues.apache.org/jira/browse/REEF-1641) Pull request: This closes #1156 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/18c25fdc Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/18c25fdc Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/18c25fdc Branch: refs/heads/master Commit: 18c25fdcfabb7ba1c56ea374d0d7f6777d99d7fd Parents: 08f096a Author: Julia Wang <jwang98...@yahoo.com> Authored: Fri Oct 14 14:26:36 2016 -0700 Committer: Mariia Mykhailova <mar...@apache.org> Committed: Mon Oct 17 11:21:41 2016 -0700 ---------------------------------------------------------------------- .../TestEvaluatorManager.cs | 36 ++-- .../OnREEF/Driver/EvaluatorManager.cs | 61 +++--- .../OnREEF/Driver/IMRUDriver.cs | 46 ++--- ...tFailUpdateEvaluatorOnWaitingForEvaluator.cs | 186 +++++++++++++++++++ .../Org.Apache.REEF.Tests.csproj | 1 + 5 files changed, 253 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/18c25fdc/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs index 1a27917..648d0f5 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs @@ -29,8 +29,6 @@ namespace Org.Apache.REEF.IMRU.Tests public sealed class TestEvaluatorManager { private const string EvaluatorIdPrefix = "EvaluatorId"; - private int _masterBatchIdSquenceNumber = 0; - private int _mapperBatchIdSquenceNumber = 0; /// <summary> /// Test valid add, remove Evaluators @@ -46,13 +44,13 @@ namespace Org.Apache.REEF.IMRU.Tests Assert.True(evaluatorManager.IsAllocatedEvaluator(EvaluatorIdPrefix + 2)); Assert.False(evaluatorManager.IsMasterEvaluatorFailed()); - evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1); + evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2); Assert.Equal(2, evaluatorManager.NumberOfAllocatedEvaluators); - Assert.True(evaluatorManager.IsMasterEvaluatorFailed()); - Assert.Equal(0, evaluatorManager.NumberofFailedMappers()); + Assert.False(evaluatorManager.IsMasterEvaluatorFailed()); + Assert.Equal(1, evaluatorManager.NumberofFailedMappers()); evaluatorManager.ResetFailedEvaluators(); - evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2)); Assert.True(evaluatorManager.AreAllEvaluatorsAllocated()); } @@ -63,9 +61,9 @@ namespace Org.Apache.REEF.IMRU.Tests public void TestNoMasterEvaluator() { var evaluatorManager = CreateEvaluatorManager(3, 1); - evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); - evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); - Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1)); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2)); + Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3)); Assert.Throws<IMRUSystemException>(add); } @@ -76,9 +74,9 @@ namespace Org.Apache.REEF.IMRU.Tests public void TestTwoMasterEvaluator() { var evaluatorManager = CreateEvaluatorManager(3, 1); - evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); - evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); - Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1)); + evaluatorManager.AddMasterEvaluator(CreateMockAllocatedEvaluator(2)); + Action add = () => evaluatorManager.AddMasterEvaluator(CreateMockAllocatedEvaluator(3)); Assert.Throws<IMRUSystemException>(add); } @@ -89,9 +87,9 @@ namespace Org.Apache.REEF.IMRU.Tests public void TestTooManyEvaluators() { var evaluatorManager = CreateEvaluatorManager(2, 1); - evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); - evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); - Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); + evaluatorManager.AddMasterEvaluator(CreateMockAllocatedEvaluator(1)); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2)); + Action add = () => evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3)); Assert.Throws<IMRUSystemException>(add); } @@ -144,10 +142,10 @@ namespace Org.Apache.REEF.IMRU.Tests private EvaluatorManager CreateTestEvaluators(int totalEvaluators, int allowedNumberOfEvaluatorFailures) { var evaluatorManager = CreateEvaluatorManager(totalEvaluators, allowedNumberOfEvaluatorFailures); - evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++)); + evaluatorManager.AddMasterEvaluator(CreateMockAllocatedEvaluator(1)); for (var i = 2; i <= totalEvaluators; i++) { - evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(i, EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++)); + evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(i)); } return evaluatorManager; } @@ -156,12 +154,10 @@ namespace Org.Apache.REEF.IMRU.Tests /// Create a mocked IAllocatedEvaluator /// </summary> /// <param name="id"></param> - /// <param name="batchId"></param> /// <returns></returns> - private static IAllocatedEvaluator CreateMockAllocatedEvaluator(int id, string batchId) + private static IAllocatedEvaluator CreateMockAllocatedEvaluator(int id) { IAllocatedEvaluator mockAllocatedEvaluator = Substitute.For<IAllocatedEvaluator>(); - mockAllocatedEvaluator.EvaluatorBatchId.Returns(batchId); mockAllocatedEvaluator.Id.Returns(EvaluatorIdPrefix + id); return mockAllocatedEvaluator; } http://git-wip-us.apache.org/repos/asf/reef/blob/18c25fdc/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs index 5f6856c..fdbb463 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs @@ -34,10 +34,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver internal sealed class EvaluatorManager { private static readonly Logger Logger = Logger.GetLogger(typeof(EvaluatorManager)); - internal const string MasterBatchId = "MasterBatchId"; - internal const string MapperBatchId = "MapperBatchId"; - private int _masterBatchIdSquenceNumber = 0; - private int _mapperBatchIdSquenceNumber = 0; private readonly ISet<string> _allocatedEvaluatorIds = new HashSet<string>(); private readonly ISet<string> _failedEvaluatorIds = new HashSet<string>(); @@ -82,17 +78,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver .SetCores(_updateEvaluatorSpecification.Core) .SetMegabytes(_updateEvaluatorSpecification.Megabytes) .SetNumber(1) - .SetEvaluatorBatchId(MasterBatchId + _masterBatchIdSquenceNumber) .Build()); var message = string.Format(CultureInfo.InvariantCulture, - "Submitted master evaluator with core [{0}], memory [{1}] and batch id [{2}].", + "Submitted master evaluator with core [{0}], memory [{1}].", _updateEvaluatorSpecification.Core, - _updateEvaluatorSpecification.Megabytes, - MasterBatchId + _masterBatchIdSquenceNumber); + _updateEvaluatorSpecification.Megabytes); Logger.Log(Level.Info, message); - - _masterBatchIdSquenceNumber++; } /// <summary> @@ -106,29 +98,24 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver .SetMegabytes(_mapperEvaluatorSpecification.Megabytes) .SetNumber(numEvaluators) .SetCores(_mapperEvaluatorSpecification.Core) - .SetEvaluatorBatchId(MapperBatchId + _mapperBatchIdSquenceNumber) .Build()); var message = string.Format(CultureInfo.InvariantCulture, - "Submitted [{0}] mapper evaluators with core [{1}], memory [{2}] and batch id [{3}].", + "Submitted [{0}] mapper evaluators with core [{1}], memory [{2}].", numEvaluators, _mapperEvaluatorSpecification.Core, - _mapperEvaluatorSpecification.Megabytes, - MasterBatchId + _mapperBatchIdSquenceNumber); + _mapperEvaluatorSpecification.Megabytes); Logger.Log(Level.Info, message); - - _mapperBatchIdSquenceNumber++; } /// <summary> /// Add an Evaluator id to _allocatedEvaluators. - /// If the IAllocatedEvaluator is for master, set master Evaluator id /// IMRUSystemException will be thrown in the following cases: /// The Evaluator Id is already in the allocated Evaluator collection /// The added IAllocatedEvaluator is the last one expected, and master Evaluator is still not added yet /// The number of AllocatedEvaluators has reached the total expected Evaluators /// </summary> - /// <param name="evaluator"></param> + /// <param name="evaluator">Evaluator to add</param> internal void AddAllocatedEvaluator(IAllocatedEvaluator evaluator) { if (IsAllocatedEvaluator(evaluator.Id)) @@ -137,17 +124,12 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver Exceptions.Throw(new IMRUSystemException(msg), Logger); } - if (IsEvaluatorForMaster(evaluator)) - { - SetMasterEvaluatorId(evaluator.Id); - } - if (NumberOfAllocatedEvaluators >= _totalExpectedEvaluators) { string msg = string.Format("Trying to add an additional Evaluator {0}, but the total expected Evaluator number {1} has been reached.", evaluator.Id, _totalExpectedEvaluators); Exceptions.Throw(new IMRUSystemException(msg), Logger); } - + _allocatedEvaluatorIds.Add(evaluator.Id); if (_masterEvaluatorId == null && NumberOfAllocatedEvaluators == _totalExpectedEvaluators) @@ -158,6 +140,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> + /// Add master evaluator + /// </summary> + /// <param name="evaluator">Evaluator to add</param> + internal void AddMasterEvaluator(IAllocatedEvaluator evaluator) + { + SetMasterEvaluatorId(evaluator.Id); + _allocatedEvaluatorIds.Add(evaluator.Id); + } + + /// <summary> /// Remove an Evaluator from allocated Evaluator collection by evaluator id. /// If the given evaluator id is not in allocated Evaluator collection, throw IMRUSystemException. /// </summary> @@ -217,7 +209,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> - /// Remove failed evaluator from the colletion + /// Remove failed evaluator from the collection /// </summary> /// <param name="evaluatorId"></param> internal void RemoveFailedEvaluator(string evaluatorId) @@ -292,16 +284,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> - /// Checks if the IAllocatedEvaluator is for master - /// </summary> - /// <param name="evaluator"></param> - /// <returns></returns> - internal bool IsEvaluatorForMaster(IAllocatedEvaluator evaluator) - { - return evaluator.EvaluatorBatchId.StartsWith(MasterBatchId); - } - - /// <summary> /// Checks if the evaluator id is the master evaluator id /// </summary> /// <param name="evaluatorId"></param> @@ -316,6 +298,15 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver } /// <summary> + /// Returns true if the master evaluator has been allocated otherwise false + /// </summary> + /// <returns></returns> + internal bool IsMasterEvaluatorAllocated() + { + return _masterEvaluatorId != null; + } + + /// <summary> /// Checks if the master Evaluator failed /// </summary> /// <returns></returns> http://git-wip-us.apache.org/repos/asf/reef/blob/18c25fdc/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs index a468e5e..3284859 100644 --- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs +++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs @@ -181,7 +181,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver { //// TODO[REEF-598]: Set a timeout for this request to be satisfied. If it is not within that time, exit the Driver. _evaluatorManager.RequestUpdateEvaluator(); - _evaluatorManager.RequestMapEvaluators(_totalMappers); } #endregion IDriverStarted @@ -199,7 +198,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver /// <param name="allocatedEvaluator">The allocated evaluator</param> public void OnNext(IAllocatedEvaluator allocatedEvaluator) { - Logger.Log(Level.Info, "AllocatedEvaluator EvaluatorBatchId [{0}], memory [{1}], systemState {2}.", allocatedEvaluator.EvaluatorBatchId, allocatedEvaluator.GetEvaluatorDescriptor().Memory, _systemState.CurrentState); + Logger.Log(Level.Info, "AllocatedEvaluator memory [{0}], systemState {1}.", allocatedEvaluator.GetEvaluatorDescriptor().Memory, _systemState.CurrentState); lock (_lock) { using (Logger.LogFunction("IMRUDriver::IAllocatedEvaluator")) @@ -207,7 +206,15 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver switch (_systemState.CurrentState) { case SystemState.WaitingForEvaluator: - _evaluatorManager.AddAllocatedEvaluator(allocatedEvaluator); + if (!_evaluatorManager.IsMasterEvaluatorAllocated()) + { + _evaluatorManager.AddMasterEvaluator(allocatedEvaluator); + _evaluatorManager.RequestMapEvaluators(_totalMappers); + } + else + { + _evaluatorManager.AddAllocatedEvaluator(allocatedEvaluator); + } SubmitContextAndService(allocatedEvaluator); break; case SystemState.Fail: @@ -232,7 +239,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver private void SubmitContextAndService(IAllocatedEvaluator allocatedEvaluator) { ContextAndServiceConfiguration configs; - if (_evaluatorManager.IsEvaluatorForMaster(allocatedEvaluator)) + if (_evaluatorManager.IsMasterEvaluatorId(allocatedEvaluator.Id)) { configs = _serviceAndContextConfigurationProvider @@ -502,26 +509,21 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver switch (_systemState.CurrentState) { case SystemState.WaitingForEvaluator: - if (!_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures()) + if (!_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures() && !isMaster) { - if (isMaster) - { - Logger.Log(Level.Info, "Requesting a master Evaluator."); - _evaluatorManager.RemoveFailedEvaluator(failedEvaluator.Id); - _evaluatorManager.RequestUpdateEvaluator(); - } - else - { - _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider( - failedEvaluator.Id); - Logger.Log(Level.Info, "Requesting mapper Evaluators."); - _evaluatorManager.RemoveFailedEvaluator(failedEvaluator.Id); - _evaluatorManager.RequestMapEvaluators(1); - } + _serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider( + failedEvaluator.Id); + Logger.Log(Level.Info, "Requesting mapper Evaluators."); + _evaluatorManager.RemoveFailedEvaluator(failedEvaluator.Id); + _evaluatorManager.RequestMapEvaluators(1); } else { - Logger.Log(Level.Error, "The system is not recoverable, change the state to Fail."); + var reason1 = _evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures() + ? "it exceeded MaximumNumberOfEvaluatorFailures, " + : ""; + var reason2 = isMaster ? "master evaluator failed, " : ""; + Logger.Log(Level.Error, "The system is not recoverable because " + reason1 + reason2 + " changing the system state to Fail."); _systemState.MoveNext(SystemStateEvent.NotRecoverable); FailAction(); } @@ -712,8 +714,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver { ShutDownAllEvaluators(); var msg = string.Format(CultureInfo.InvariantCulture, - "{0} The system cannot be recovered after {1} retries. NumberofFailedMappers in the last try is {2}.", - FailActionPrefix, _numberOfRetries, _evaluatorManager.NumberofFailedMappers()); + "{0} The system cannot be recovered after {1} retries. NumberofFailedMappers in the last try is {2}, master evaluator failed is {3}.", + FailActionPrefix, _numberOfRetries, _evaluatorManager.NumberofFailedMappers(), _evaluatorManager.IsMasterEvaluatorFailed()); Exceptions.Throw(new ApplicationException(msg), Logger); } http://git-wip-us.apache.org/repos/asf/reef/blob/18c25fdc/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluatorOnWaitingForEvaluator.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluatorOnWaitingForEvaluator.cs b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluatorOnWaitingForEvaluator.cs new file mode 100644 index 0000000..08a9c7d --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluatorOnWaitingForEvaluator.cs @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +using System; +using System.Diagnostics; +using Org.Apache.REEF.Common.Context; +using Org.Apache.REEF.Common.Events; +using Org.Apache.REEF.Driver.Context; +using Org.Apache.REEF.IMRU.OnREEF.Driver; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Tang.Implementations.Tang; +using Org.Apache.REEF.Tang.Interface; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Utilities.Logging; +using Xunit; + +namespace Org.Apache.REEF.Tests.Functional.IMRU +{ + [Collection("FunctionalTests")] + public class TestFailUpdateEvaluatorOnWaitingForEvaluator : IMRUBrodcastReduceTestBase + { + /// <summary> + /// This test is to fail master evaluator at context start. The system will fail. + /// </summary> + [Fact] + public void TestFailUpdateEvaluatorAtContexStartOnLocalRuntime() + { + int chunkSize = 2; + int dims = 10; + int iterations = 1000; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 8; + int numberOfRetryInRecovery = 2; + string testFolder = DefaultRuntimeFolder + TestId; + TestBroadCastAndReduce(false, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery, testFolder); + + string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 120); + var jobFailure = GetMessageCount(lines, IMRUDriver<int[], int[], int[], int[]>.FailActionPrefix); + Assert.True(jobFailure > 0); + CleanUp(testFolder); + } + + /// <summary> + /// This test is to fail master evaluator at context start. The system will fail. + /// </summary> + [Fact(Skip = "Requires Yarn")] + public void TestFailUpdateEvaluatorAtContexStartOnOnYarn() + { + int chunkSize = 2; + int dims = 10; + int iterations = 1000; + int mapperMemory = 5120; + int updateTaskMemory = 5120; + int numTasks = 4; + int numberOfRetryInRecovery = 2; + TestBroadCastAndReduce(true, numTasks, chunkSize, dims, iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery); + } + + /// <summary> + /// This method defines event handlers for driver. As default, it uses all the handlers defined in IMRUDriver. + /// </summary> + /// <typeparam name="TMapInput"></typeparam> + /// <typeparam name="TMapOutput"></typeparam> + /// <typeparam name="TResult"></typeparam> + /// <typeparam name="TPartitionType"></typeparam> + /// <returns></returns> + protected override IConfiguration DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, TPartitionType>() + { + return REEF.Driver.DriverConfiguration.ConfigurationModule + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnDriverStarted, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextActive, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextActive, + GenericType<AnotherContextHandler>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnContextFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskRunning, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskFailed, + GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, TPartitionType>>.Class) + .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, GenericType<MessageLogger>.Class) + .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, TraceLevel.Info.ToString()) + .Build(); + } + + /// <summary> + /// Another context handler that will submit a child context if the active context received is master context + /// </summary> + internal sealed class AnotherContextHandler : IObserver<IActiveContext> + { + [Inject] + private AnotherContextHandler() + { + } + + public void OnCompleted() + { + throw new NotImplementedException(); + } + + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + public void OnNext(IActiveContext activeContext) + { + Logger.Log(Level.Info, "Receiving IActiveContext with context id {0}, Evaluator id : {1}.", activeContext.Id, activeContext.EvaluatorId); + if (activeContext.Id.Equals(IMRUConstants.MasterContextId)) + { + var contextConf = ContextConfiguration.ConfigurationModule + .Set(ContextConfiguration.Identifier, "KillEvaluatorContext") + .Build(); + + var childContextConf = + TangFactory.GetTang() + .NewConfigurationBuilder(contextConf) + .BindSetEntry<ContextConfigurationOptions.StartHandlers, KillEvaluatorContextStartHandler, IObserver<IContextStart>>(GenericType<ContextConfigurationOptions.StartHandlers>.Class, GenericType<KillEvaluatorContextStartHandler>.Class) + .Build(); + + activeContext.SubmitContext(childContextConf); + } + } + } + + /// <summary> + /// A Context start handler that is registered on a child context of master evaluator. + /// It will kill the evaluator to simulate evaluator failure. + /// </summary> + internal class KillEvaluatorContextStartHandler : IObserver<IContextStart> + { + [Inject] + private KillEvaluatorContextStartHandler() + { + } + + /// <summary> + /// Simulate to kill the Evaluator + /// </summary> + /// <param name="value">context start token</param> + public void OnNext(IContextStart value) + { + Environment.Exit(1); + } + + /// <summary> + /// Specifies what to do if error occurs. + /// </summary> + /// <param name="error">Exception</param> + public void OnError(Exception error) + { + throw new NotImplementedException(); + } + + /// <summary> + /// Specifies what to do at completion. + /// </summary> + public void OnCompleted() + { + throw new NotImplementedException(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/18c25fdc/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj index 50a75e8..588baf1 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj +++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj @@ -125,6 +125,7 @@ under the License. <Compile Include="Functional\IMRU\IMRUBrodcastReduceTestBase.cs" /> <Compile Include="Functional\IMRU\IMRUCloseTaskTest.cs" /> <Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" /> + <Compile Include="Functional\IMRU\TestFailUpdateEvaluatorOnWaitingForEvaluator.cs" /> <Compile Include="Functional\IMRU\TestFailMapperTasksOnDispose.cs" /> <Compile Include="Functional\IMRU\TestFailMapperEvaluatorsOnDispose.cs" /> <Compile Include="Functional\IMRU\TestFailMapperEvaluatorsOnInit.cs" />