[ 
https://issues.apache.org/jira/browse/MAPREDUCE-3315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13246011#comment-13246011
 ] 

Nikhil S. Ketkar commented on MAPREDUCE-3315:
---------------------------------------------

Here is a brief description of the API in the current patch. This is quite 
preliminary and I will be improving on this based on feedback.

In order to implement a new Master-Worker job, the user has to implement 4 
classes, which are, WorkUnit, ResultUnit, Master and Worker. The WorkUnit and 
ResultUnit classes extend the MWMessage class which is an abstract class and is 
placed in the Master-Worker framework. Similarly, Master extends the 
MWApplicationMaster and Worker extends the MWWorkerRunner. Lets look at each of 
the classes one by one.

Here is the code for the WorkUnit. Note that here, a single integer has been 
added as a payload and it represents the data that the Master will populate and 
the Worker will work on. The framework passes around MWMessage objects and is 
unaware of the additional data that might be contained in the MWMessage. It is 
the users reponsibility to populate and extract the payload information (in the 
Master and Worker classes) and also provide methods to serialize and 
deserialize the payload data.

{code}
public class WorkUnit extends MWMessage {
  int data;

  public int getData() {
    return data;
  }

  public void setData(int data) {
    this.data = data;
  }

  @Override
  public void writeWorkUnit(DataOutput out) throws IOException {
    out.writeInt(data);
  }

  @Override
  public void readFieldsWorkUnit(DataInput in) throws IOException {
    data = in.readInt();
  }
}
{code}

Similarly, here is the code for the ResultUnit. For our simple example its 
quite identical to the WorkUnit. As with the WorkUnit, the result unit also 
contains the payload and its the users responsibility to populate and extract 
the payload and provide functionality to serialize and deserialize the payload.

{code}
public class ResultUnit extends MWMessage {
  int data;
  
  public int getData() {
    return data;
  }

  public void setData(int data) {
    this.data = data;
  }

  @Override
  public void writeWorkUnit(DataOutput out) throws IOException {
    out.writeInt(data);   
  }

  @Override
  public void readFieldsWorkUnit(DataInput in) throws IOException {
    data = in.readInt();
  }
{code}

Now lets look at the API for the Worker. Any Worker should extend the 
MWWorkerRunner class. It should override the doWork method which basically 
receives a WorkUnit and returns a ResultUnit. For this simple example, I am 
simply populating the ResultUnit with the data in the WorkUnit.

{code}
public class MWWorker extends MWWorkerRunner {

  public static void main(String[] args) {
    MWWorker curr = new MWWorker();
    try {
      curr.init("localhost", 16001);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  @Override
  public MWMessage doWork(MWMessage workUnit) {
    ResultUnit result = new ResultUnit();
    int got = ((WorkUnit) workUnit).getData();
    result.setData(got);
    return result;
  }

}
{code}

Now, on to the Master. Any Master extends the MWApplicationMaster class and 
overrides the manageWorkers method. There are 4 methods in MWApplicationMaster 
that the master can use. The addWorker method which simply adds a worker. 
Similarly, there is a killWorker method that kills a worker. This basically 
allows the user to add workers and get rid of them based on the work load. To 
assign work, the user can use the addWork method which takes the WorkUnit as a 
parameter. This is a non-blocking call. To get a ResultUnit the user can use 
the waitForResult method which returns a ResultUnit. This is a blocking call.

{code}
public class MWMaster extends MWApplicationMaster {
  private static final Log LOG = LogFactory.getLog(MWMaster.class);
  
  public static void main(String[] args) throws InterruptedException, 
ParseException, IOException, URISyntaxException {
    MWMaster curr = new MWMaster();
    curr.initiate(args);
    curr.terminate();
  }
  
  @Override
  public void manageWorkers() {

  addWorker();
  addWorker();
    
   for(int i = 0; i < 100; i++) {
     WorkUnit curr = new WorkUnit();
     curr.setData(i);
     addWork(curr);
   }

   for(int i = 0; i < 100; i++) {
    try {
      ResultUnit curr = (ResultUnit) waitForResult();
      LOG.info("Receiveing Result" + curr.getData());
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
   }

   killWorker();
   killWorker();
  }
}
{code}

Currenly, I have not provided Client API for submission. The Client is 
basically a part of the framework, the code of this can be found in the 
MWClient class.

Here is how the example application is to be launched. There are 4 required 
parameters, the MasterWorker Library Jar (--masterworkerlib) which contains the 
client code, the MasterWorker Application Jar (masterworkerapp) which contains 
the users application, and the main classes for the Master and the Worker 
(masterclass and workerclass) respectively.
{code}
hadoop jar masterworker-0.0.1-SNAPSHOT.jar 
org.apache.hadoop.yarn.applications.masterworker.MWClient --masterworkerlib 
hadoop-yarn-applications-masterworker-core-3.0.0-SNAPSHOT.jar --masterworkerapp 
hadoop-yarn-applications-masterworker-example-3.0.0-SNAPSHOT.jar --masterclass 
org.apache.hadoop.yarn.applications.masterworkerexample.MWMaster --workerclass 
org.apache.hadoop.yarn.applications.masterworkerexample.MWWorker
{code}  
                
> Master-Worker Application on YARN
> ---------------------------------
>
>                 Key: MAPREDUCE-3315
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-3315
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>            Reporter: Sharad Agarwal
>            Assignee: Sharad Agarwal
>             Fix For: 0.24.0
>
>         Attachments: MAPREDUCE-3315.patch
>
>
> Currently master worker scenarios are forced fit into Map-Reduce. Now with 
> YARN, these can be first class and would benefit real/near realtime workloads 
> and be more effective in using the cluster resources.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to