Repository: reef
Updated Branches:
  refs/heads/master 08f096a27 -> 18c25fdcf


[REEF-1641] Remove batch id and change the way to request maser/slave evalautors

Currently in IMRU, we have different spec for master vs mapper elevators.
We use EvaluatorBatchId to distinguish which one we requested when receiving 
IAllocatedEvaluator.
However, EvaluatorBatchId may not be supported in some environments such as 
HDInsight.
So we need a more reliable way to decide which evaluator is for master.

The change in this PR is to request the first evaluator for master.
After receiving it, record it as master, then request the mappers.

An assumption is that if master fails at any stage, we fail the system.

* IMRU Driver and Evaluator Manager are updated
* Existing test cases are updated
* A new test case is added to simulate master evaluator failure during 
WaitingForEvaluators phase.

JIRA:
  [REEF-1641](https://issues.apache.org/jira/browse/REEF-1641)

Pull request:
  This closes #1156


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/18c25fdc
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/18c25fdc
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/18c25fdc

Branch: refs/heads/master
Commit: 18c25fdcfabb7ba1c56ea374d0d7f6777d99d7fd
Parents: 08f096a
Author: Julia Wang <jwang98...@yahoo.com>
Authored: Fri Oct 14 14:26:36 2016 -0700
Committer: Mariia Mykhailova <mar...@apache.org>
Committed: Mon Oct 17 11:21:41 2016 -0700

----------------------------------------------------------------------
 .../TestEvaluatorManager.cs                     |  36 ++--
 .../OnREEF/Driver/EvaluatorManager.cs           |  61 +++---
 .../OnREEF/Driver/IMRUDriver.cs                 |  46 ++---
 ...tFailUpdateEvaluatorOnWaitingForEvaluator.cs | 186 +++++++++++++++++++
 .../Org.Apache.REEF.Tests.csproj                |   1 +
 5 files changed, 253 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/18c25fdc/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs 
b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
index 1a27917..648d0f5 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/TestEvaluatorManager.cs
@@ -29,8 +29,6 @@ namespace Org.Apache.REEF.IMRU.Tests
     public sealed class TestEvaluatorManager
     {
         private const string EvaluatorIdPrefix = "EvaluatorId";
-        private int _masterBatchIdSquenceNumber = 0;
-        private int _mapperBatchIdSquenceNumber = 0;
 
         /// <summary>
         /// Test valid add, remove Evaluators
@@ -46,13 +44,13 @@ namespace Org.Apache.REEF.IMRU.Tests
             
Assert.True(evaluatorManager.IsAllocatedEvaluator(EvaluatorIdPrefix + 2));
             Assert.False(evaluatorManager.IsMasterEvaluatorFailed());
 
-            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 1);
+            evaluatorManager.RecordFailedEvaluator(EvaluatorIdPrefix + 2);
             Assert.Equal(2, evaluatorManager.NumberOfAllocatedEvaluators);
-            Assert.True(evaluatorManager.IsMasterEvaluatorFailed());
-            Assert.Equal(0, evaluatorManager.NumberofFailedMappers());
+            Assert.False(evaluatorManager.IsMasterEvaluatorFailed());
+            Assert.Equal(1, evaluatorManager.NumberofFailedMappers());
 
             evaluatorManager.ResetFailedEvaluators();
-            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, 
EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2));
             Assert.True(evaluatorManager.AreAllEvaluatorsAllocated());
         }
 
@@ -63,9 +61,9 @@ namespace Org.Apache.REEF.IMRU.Tests
         public void TestNoMasterEvaluator()
         {
             var evaluatorManager = CreateEvaluatorManager(3, 1);
-            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
-            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
-            Action add = () => 
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1));
+            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2));
+            Action add = () => 
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3));
             Assert.Throws<IMRUSystemException>(add);
         }
 
@@ -76,9 +74,9 @@ namespace Org.Apache.REEF.IMRU.Tests
         public void TestTwoMasterEvaluator()
         {
             var evaluatorManager = CreateEvaluatorManager(3, 1);
-            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, 
EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
-            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
-            Action add = () => 
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, 
EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1));
+            
evaluatorManager.AddMasterEvaluator(CreateMockAllocatedEvaluator(2));
+            Action add = () => 
evaluatorManager.AddMasterEvaluator(CreateMockAllocatedEvaluator(3));
             Assert.Throws<IMRUSystemException>(add);
         }
 
@@ -89,9 +87,9 @@ namespace Org.Apache.REEF.IMRU.Tests
         public void TestTooManyEvaluators()
         {
             var evaluatorManager = CreateEvaluatorManager(2, 1);
-            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, 
EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
-            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
-            Action add = () => 
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+            
evaluatorManager.AddMasterEvaluator(CreateMockAllocatedEvaluator(1));
+            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(2));
+            Action add = () => 
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(3));
             Assert.Throws<IMRUSystemException>(add);
         }
 
@@ -144,10 +142,10 @@ namespace Org.Apache.REEF.IMRU.Tests
         private EvaluatorManager CreateTestEvaluators(int totalEvaluators, int 
allowedNumberOfEvaluatorFailures)
         {
             var evaluatorManager = CreateEvaluatorManager(totalEvaluators, 
allowedNumberOfEvaluatorFailures);
-            
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(1, 
EvaluatorManager.MasterBatchId + _masterBatchIdSquenceNumber++));
+            
evaluatorManager.AddMasterEvaluator(CreateMockAllocatedEvaluator(1));
             for (var i = 2; i <= totalEvaluators; i++)
             {
-                
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(i, 
EvaluatorManager.MapperBatchId + _mapperBatchIdSquenceNumber++));
+                
evaluatorManager.AddAllocatedEvaluator(CreateMockAllocatedEvaluator(i));
             }
             return evaluatorManager;
         }
@@ -156,12 +154,10 @@ namespace Org.Apache.REEF.IMRU.Tests
         /// Create a mocked IAllocatedEvaluator
         /// </summary>
         /// <param name="id"></param>
-        /// <param name="batchId"></param>
         /// <returns></returns>
-        private static IAllocatedEvaluator CreateMockAllocatedEvaluator(int 
id, string batchId)
+        private static IAllocatedEvaluator CreateMockAllocatedEvaluator(int id)
         {
             IAllocatedEvaluator mockAllocatedEvaluator = 
Substitute.For<IAllocatedEvaluator>();
-            mockAllocatedEvaluator.EvaluatorBatchId.Returns(batchId);
             mockAllocatedEvaluator.Id.Returns(EvaluatorIdPrefix + id);
             return mockAllocatedEvaluator;
         }

http://git-wip-us.apache.org/repos/asf/reef/blob/18c25fdc/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
index 5f6856c..fdbb463 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/EvaluatorManager.cs
@@ -34,10 +34,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
     internal sealed class EvaluatorManager
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(EvaluatorManager));
-        internal const string MasterBatchId = "MasterBatchId";
-        internal const string MapperBatchId = "MapperBatchId";
-        private int _masterBatchIdSquenceNumber = 0;
-        private int _mapperBatchIdSquenceNumber = 0;
 
         private readonly ISet<string> _allocatedEvaluatorIds = new 
HashSet<string>();
         private readonly ISet<string> _failedEvaluatorIds = new 
HashSet<string>();
@@ -82,17 +78,13 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                     .SetCores(_updateEvaluatorSpecification.Core)
                     .SetMegabytes(_updateEvaluatorSpecification.Megabytes)
                     .SetNumber(1)
-                    .SetEvaluatorBatchId(MasterBatchId + 
_masterBatchIdSquenceNumber)
                     .Build());
 
             var message = string.Format(CultureInfo.InvariantCulture,
-                "Submitted master evaluator with core [{0}], memory [{1}] and 
batch id [{2}].",
+                "Submitted master evaluator with core [{0}], memory [{1}].",
                 _updateEvaluatorSpecification.Core,
-                _updateEvaluatorSpecification.Megabytes,
-                MasterBatchId + _masterBatchIdSquenceNumber);
+                _updateEvaluatorSpecification.Megabytes);
             Logger.Log(Level.Info, message);
-
-            _masterBatchIdSquenceNumber++;
         }
 
         /// <summary>
@@ -106,29 +98,24 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                     .SetMegabytes(_mapperEvaluatorSpecification.Megabytes)
                     .SetNumber(numEvaluators)
                     .SetCores(_mapperEvaluatorSpecification.Core)
-                    .SetEvaluatorBatchId(MapperBatchId + 
_mapperBatchIdSquenceNumber)
                     .Build());
 
             var message = string.Format(CultureInfo.InvariantCulture,
-                "Submitted [{0}] mapper evaluators with core [{1}], memory 
[{2}] and batch id [{3}].",
+                "Submitted [{0}] mapper evaluators with core [{1}], memory 
[{2}].",
                 numEvaluators,
                 _mapperEvaluatorSpecification.Core,
-                _mapperEvaluatorSpecification.Megabytes,
-                MasterBatchId + _mapperBatchIdSquenceNumber);
+                _mapperEvaluatorSpecification.Megabytes);
             Logger.Log(Level.Info, message);
-
-            _mapperBatchIdSquenceNumber++;
         }
 
         /// <summary>
         /// Add an Evaluator id to _allocatedEvaluators.
-        /// If the IAllocatedEvaluator is for master, set master Evaluator id
         /// IMRUSystemException will be thrown in the following cases:
         ///   The Evaluator Id is already in the allocated Evaluator collection
         ///   The added IAllocatedEvaluator is the last one expected, and 
master Evaluator is still not added yet
         ///   The number of AllocatedEvaluators has reached the total expected 
Evaluators
         /// </summary>
-        /// <param name="evaluator"></param>
+        /// <param name="evaluator">Evaluator to add</param>
         internal void AddAllocatedEvaluator(IAllocatedEvaluator evaluator)
         {
             if (IsAllocatedEvaluator(evaluator.Id))
@@ -137,17 +124,12 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                 Exceptions.Throw(new IMRUSystemException(msg), Logger);
             }
 
-            if (IsEvaluatorForMaster(evaluator))
-            {
-                SetMasterEvaluatorId(evaluator.Id);
-            }
-
             if (NumberOfAllocatedEvaluators >= _totalExpectedEvaluators)
             {
                 string msg = string.Format("Trying to add an additional 
Evaluator {0}, but the total expected Evaluator number {1} has been reached.", 
evaluator.Id, _totalExpectedEvaluators);
                 Exceptions.Throw(new IMRUSystemException(msg), Logger);
             }
-            
+           
             _allocatedEvaluatorIds.Add(evaluator.Id);
 
             if (_masterEvaluatorId == null && NumberOfAllocatedEvaluators == 
_totalExpectedEvaluators)
@@ -158,6 +140,16 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
+        /// Add master evaluator
+        /// </summary>
+        /// <param name="evaluator">Evaluator to add</param>
+        internal void AddMasterEvaluator(IAllocatedEvaluator evaluator)
+        {
+            SetMasterEvaluatorId(evaluator.Id);
+            _allocatedEvaluatorIds.Add(evaluator.Id);
+        }
+
+        /// <summary>
         /// Remove an Evaluator from allocated Evaluator collection by 
evaluator id.
         /// If the given evaluator id is not in allocated Evaluator 
collection, throw IMRUSystemException.
         /// </summary>
@@ -217,7 +209,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
-        /// Remove failed evaluator from the colletion
+        /// Remove failed evaluator from the collection
         /// </summary>
         /// <param name="evaluatorId"></param>
         internal void RemoveFailedEvaluator(string evaluatorId)
@@ -292,16 +284,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
-        /// Checks if the IAllocatedEvaluator is for master
-        /// </summary>
-        /// <param name="evaluator"></param>
-        /// <returns></returns>
-        internal bool IsEvaluatorForMaster(IAllocatedEvaluator evaluator)
-        {
-            return evaluator.EvaluatorBatchId.StartsWith(MasterBatchId);
-        }
-
-        /// <summary>
         /// Checks if the evaluator id is the master evaluator id
         /// </summary>
         /// <param name="evaluatorId"></param>
@@ -316,6 +298,15 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         }
 
         /// <summary>
+        /// Returns true if the master evaluator has been allocated otherwise 
false
+        /// </summary>
+        /// <returns></returns>
+        internal bool IsMasterEvaluatorAllocated()
+        {
+            return _masterEvaluatorId != null;
+        }
+
+        /// <summary>
         /// Checks if the master Evaluator failed
         /// </summary>
         /// <returns></returns>

http://git-wip-us.apache.org/repos/asf/reef/blob/18c25fdc/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs 
b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index a468e5e..3284859 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -181,7 +181,6 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         {
             //// TODO[REEF-598]: Set a timeout for this request to be 
satisfied. If it is not within that time, exit the Driver.
             _evaluatorManager.RequestUpdateEvaluator();
-            _evaluatorManager.RequestMapEvaluators(_totalMappers);
         }
         #endregion IDriverStarted
 
@@ -199,7 +198,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         /// <param name="allocatedEvaluator">The allocated evaluator</param>
         public void OnNext(IAllocatedEvaluator allocatedEvaluator)
         {
-            Logger.Log(Level.Info, "AllocatedEvaluator EvaluatorBatchId [{0}], 
memory [{1}], systemState {2}.", allocatedEvaluator.EvaluatorBatchId, 
allocatedEvaluator.GetEvaluatorDescriptor().Memory, _systemState.CurrentState);
+            Logger.Log(Level.Info, "AllocatedEvaluator memory [{0}], 
systemState {1}.", allocatedEvaluator.GetEvaluatorDescriptor().Memory, 
_systemState.CurrentState);
             lock (_lock)
             {
                 using (Logger.LogFunction("IMRUDriver::IAllocatedEvaluator"))
@@ -207,7 +206,15 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                     switch (_systemState.CurrentState)
                     {
                         case SystemState.WaitingForEvaluator:
-                            
_evaluatorManager.AddAllocatedEvaluator(allocatedEvaluator);
+                            if 
(!_evaluatorManager.IsMasterEvaluatorAllocated())
+                            {
+                                
_evaluatorManager.AddMasterEvaluator(allocatedEvaluator);
+                                
_evaluatorManager.RequestMapEvaluators(_totalMappers);
+                            }
+                            else
+                            {
+                                
_evaluatorManager.AddAllocatedEvaluator(allocatedEvaluator);
+                            }
                             SubmitContextAndService(allocatedEvaluator);
                             break;
                         case SystemState.Fail:
@@ -232,7 +239,7 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         private void SubmitContextAndService(IAllocatedEvaluator 
allocatedEvaluator)
         {
             ContextAndServiceConfiguration configs;
-            if (_evaluatorManager.IsEvaluatorForMaster(allocatedEvaluator))
+            if (_evaluatorManager.IsMasterEvaluatorId(allocatedEvaluator.Id))
             {
                 configs =
                     _serviceAndContextConfigurationProvider
@@ -502,26 +509,21 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
                     switch (_systemState.CurrentState)
                     {
                         case SystemState.WaitingForEvaluator:
-                            if 
(!_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures())
+                            if 
(!_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures() && !isMaster)
                             {
-                                if (isMaster)
-                                {
-                                    Logger.Log(Level.Info, "Requesting a 
master Evaluator.");
-                                    
_evaluatorManager.RemoveFailedEvaluator(failedEvaluator.Id);
-                                    _evaluatorManager.RequestUpdateEvaluator();
-                                }
-                                else
-                                {
-                                    
_serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(
-                                        failedEvaluator.Id);
-                                    Logger.Log(Level.Info, "Requesting mapper 
Evaluators.");
-                                    
_evaluatorManager.RemoveFailedEvaluator(failedEvaluator.Id);
-                                    _evaluatorManager.RequestMapEvaluators(1);
-                                }
+                                
_serviceAndContextConfigurationProvider.RemoveEvaluatorIdFromPartitionIdProvider(
+                                    failedEvaluator.Id);
+                                Logger.Log(Level.Info, "Requesting mapper 
Evaluators.");
+                                
_evaluatorManager.RemoveFailedEvaluator(failedEvaluator.Id);
+                                _evaluatorManager.RequestMapEvaluators(1);
                             }
                             else
                             {
-                                Logger.Log(Level.Error, "The system is not 
recoverable, change the state to Fail.");
+                                var reason1 = 
_evaluatorManager.ExceededMaximumNumberOfEvaluatorFailures()
+                                    ? "it exceeded 
MaximumNumberOfEvaluatorFailures, "
+                                    : "";
+                                var reason2 = isMaster ? "master evaluator 
failed, " : "";
+                                Logger.Log(Level.Error, "The system is not 
recoverable because " +  reason1 + reason2 + " changing the system state to 
Fail.");
                                 
_systemState.MoveNext(SystemStateEvent.NotRecoverable);
                                 FailAction();
                             }
@@ -712,8 +714,8 @@ namespace Org.Apache.REEF.IMRU.OnREEF.Driver
         {
             ShutDownAllEvaluators();
             var msg = string.Format(CultureInfo.InvariantCulture,
-                "{0} The system cannot be recovered after {1} retries. 
NumberofFailedMappers in the last try is {2}.",
-                FailActionPrefix, _numberOfRetries, 
_evaluatorManager.NumberofFailedMappers());
+                "{0} The system cannot be recovered after {1} retries. 
NumberofFailedMappers in the last try is {2}, master evaluator failed is {3}.",
+                FailActionPrefix, _numberOfRetries, 
_evaluatorManager.NumberofFailedMappers(), 
_evaluatorManager.IsMasterEvaluatorFailed());
             Exceptions.Throw(new ApplicationException(msg), Logger);
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/18c25fdc/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluatorOnWaitingForEvaluator.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluatorOnWaitingForEvaluator.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluatorOnWaitingForEvaluator.cs
new file mode 100644
index 0000000..08a9c7d
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/IMRU/TestFailUpdateEvaluatorOnWaitingForEvaluator.cs
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Diagnostics;
+using Org.Apache.REEF.Common.Context;
+using Org.Apache.REEF.Common.Events;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.IMRU.OnREEF.Driver;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Xunit;
+
+namespace Org.Apache.REEF.Tests.Functional.IMRU
+{
+    [Collection("FunctionalTests")]
+    public class TestFailUpdateEvaluatorOnWaitingForEvaluator : 
IMRUBrodcastReduceTestBase
+    {
+        /// <summary>
+        /// This test is to fail master evaluator at context start. The system 
will fail. 
+        /// </summary>
+        [Fact]
+        public void TestFailUpdateEvaluatorAtContexStartOnLocalRuntime()
+        {
+            int chunkSize = 2;
+            int dims = 10;
+            int iterations = 1000;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 8;
+            int numberOfRetryInRecovery = 2;
+            string testFolder = DefaultRuntimeFolder + TestId;
+            TestBroadCastAndReduce(false, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery, 
testFolder);
+
+            string[] lines = ReadLogFile(DriverStdout, "driver", testFolder, 
120);
+            var jobFailure = GetMessageCount(lines, IMRUDriver<int[], int[], 
int[], int[]>.FailActionPrefix);
+            Assert.True(jobFailure > 0);
+            CleanUp(testFolder);
+        }
+
+        /// <summary>
+        /// This test is to fail master evaluator at context start. The system 
will fail. 
+        /// </summary>
+        [Fact(Skip = "Requires Yarn")]
+        public void TestFailUpdateEvaluatorAtContexStartOnOnYarn()
+        {
+            int chunkSize = 2;
+            int dims = 10;
+            int iterations = 1000;
+            int mapperMemory = 5120;
+            int updateTaskMemory = 5120;
+            int numTasks = 4;
+            int numberOfRetryInRecovery = 2;
+            TestBroadCastAndReduce(true, numTasks, chunkSize, dims, 
iterations, mapperMemory, updateTaskMemory, numberOfRetryInRecovery);
+        }
+
+        /// <summary>
+        /// This method defines event handlers for driver. As default, it uses 
all the handlers defined in IMRUDriver.
+        /// </summary>
+        /// <typeparam name="TMapInput"></typeparam>
+        /// <typeparam name="TMapOutput"></typeparam>
+        /// <typeparam name="TResult"></typeparam>
+        /// <typeparam name="TPartitionType"></typeparam>
+        /// <returns></returns>
+        protected override IConfiguration 
DriverEventHandlerConfigurations<TMapInput, TMapOutput, TResult, 
TPartitionType>()
+        {
+            return REEF.Driver.DriverConfiguration.ConfigurationModule
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorAllocated,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnDriverStarted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextActive,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextActive,
+                    GenericType<AnotherContextHandler>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnEvaluatorFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnContextFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskRunning,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskFailed,
+                    GenericType<IMRUDriver<TMapInput, TMapOutput, TResult, 
TPartitionType>>.Class)
+                .Set(REEF.Driver.DriverConfiguration.OnTaskCompleted, 
GenericType<MessageLogger>.Class)
+                .Set(REEF.Driver.DriverConfiguration.CustomTraceLevel, 
TraceLevel.Info.ToString())
+                .Build();
+        }
+
+        /// <summary>
+        /// Another context handler that will submit a child context if the 
active context received is master context
+        /// </summary>
+        internal sealed class AnotherContextHandler : IObserver<IActiveContext>
+        {
+            [Inject]
+            private AnotherContextHandler()
+            {                
+            }
+
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            public void OnNext(IActiveContext activeContext)
+            {
+                Logger.Log(Level.Info, "Receiving IActiveContext with context 
id {0}, Evaluator id : {1}.", activeContext.Id, activeContext.EvaluatorId);
+                if (activeContext.Id.Equals(IMRUConstants.MasterContextId))
+                {
+                    var contextConf = ContextConfiguration.ConfigurationModule
+                        .Set(ContextConfiguration.Identifier, 
"KillEvaluatorContext")
+                        .Build();
+
+                    var childContextConf =
+                        TangFactory.GetTang()
+                            .NewConfigurationBuilder(contextConf)
+                            
.BindSetEntry<ContextConfigurationOptions.StartHandlers, 
KillEvaluatorContextStartHandler, 
IObserver<IContextStart>>(GenericType<ContextConfigurationOptions.StartHandlers>.Class,
 GenericType<KillEvaluatorContextStartHandler>.Class)
+                            .Build();
+
+                    activeContext.SubmitContext(childContextConf);
+                }
+            }
+        }
+
+        /// <summary>
+        /// A Context start handler that is registered on a child context of 
master evaluator.
+        /// It will kill the evaluator to simulate evaluator failure.
+        /// </summary>
+        internal class KillEvaluatorContextStartHandler : 
IObserver<IContextStart>
+        {
+            [Inject]
+            private KillEvaluatorContextStartHandler()
+            {
+            }
+
+            /// <summary>
+            /// Simulate to kill the Evaluator
+            /// </summary>
+            /// <param name="value">context start token</param>
+            public void OnNext(IContextStart value)
+            {
+                Environment.Exit(1);
+            }
+
+            /// <summary>
+            /// Specifies what to do if error occurs. 
+            /// </summary>
+            /// <param name="error">Exception</param>
+            public void OnError(Exception error)
+            {
+                throw new NotImplementedException();
+            }
+
+            /// <summary>
+            /// Specifies what to do at completion. 
+            /// </summary>
+            public void OnCompleted()
+            {
+                throw new NotImplementedException();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/18c25fdc/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index 50a75e8..588baf1 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -125,6 +125,7 @@ under the License.
     <Compile Include="Functional\IMRU\IMRUBrodcastReduceTestBase.cs" />
     <Compile Include="Functional\IMRU\IMRUCloseTaskTest.cs" />
     <Compile Include="Functional\IMRU\IMRUMapperCountTest.cs" />
+    <Compile 
Include="Functional\IMRU\TestFailUpdateEvaluatorOnWaitingForEvaluator.cs" />
     <Compile Include="Functional\IMRU\TestFailMapperTasksOnDispose.cs" />
     <Compile Include="Functional\IMRU\TestFailMapperEvaluatorsOnDispose.cs" />
     <Compile Include="Functional\IMRU\TestFailMapperEvaluatorsOnInit.cs" />

Reply via email to