Author: degenaro
Date: Wed Feb  6 19:35:53 2013
New Revision: 1443158

URL: http://svn.apache.org/viewvc?rev=1443158&view=rev
Log:
UIMA-2642 Replace Camel listening mechanism with WS polling mechanism for Job 
status monitoring

Modified:
    
uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java

Modified: 
uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java
URL: 
http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java?rev=1443158&r1=1443157&r2=1443158&view=diff
==============================================================================
--- 
uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java
 (original)
+++ 
uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java
 Wed Feb  6 19:35:53 2013
@@ -19,22 +19,17 @@
 package org.apache.uima.ducc.cli;
 
 import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.lang.management.ManagementFactory;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.camel.component.ActiveMQComponent;
-import org.apache.camel.Body;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -43,20 +38,17 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.uima.ducc.api.DuccMessage;
 import org.apache.uima.ducc.api.IDuccMessageProcessor;
+import org.apache.uima.ducc.common.json.MonitorInfo;
 import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
-import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher;
-import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent;
-import org.apache.uima.ducc.transport.event.common.DuccWorkJob;
-import org.apache.uima.ducc.transport.event.common.IDuccState.JobState;
-import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType;
-import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener;
 
+import com.google.gson.Gson;
 
 /**
  * Monitor a DUCC job
  */
 
-public class DuccJobMonitor extends DuccUi implements 
DuccEventDelegateListener {
+public class DuccJobMonitor extends DuccUi {
+private Thread main = null;
        
        private static final int RC_UNKNOWN = -1;
        private static final int RC_SUCCESS = 0;
@@ -66,44 +58,34 @@ public class DuccJobMonitor extends Ducc
        
        private static final String NotFound = "NotFound";
        
-       private AtomicBoolean stopped = new AtomicBoolean(false);
-       private AtomicBoolean jobActive = new AtomicBoolean(true);
-       private AtomicInteger rc = new AtomicInteger(RC_UNKNOWN);
-       
-       private String lastMessage = "";
+       private static final String StateRunning        = "Running";
+       private static final String StateCompleting = "Completing";
+       private static final String StateCompleted      = "Completed";
        
-       private Thread main = null;
-       
-       private CamelContext context;
-       private ActiveMQComponent amqc;
+       private AtomicInteger rc = new AtomicInteger(RC_UNKNOWN);
+       private AtomicBoolean cancel_job_on_interrupt = new 
AtomicBoolean(false);
        
-       private String broker = DuccUiUtilities.buildBrokerUrl();
-       private String endpoint = 
DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_jms_provider)
-                                               + ":"
-                                               + 
DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_orchestrator_state_update_endpoint_type)
-                                               + ":"
-                                               + 
DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_orchestrator_state_update_endpoint)
-                                               ;
-       private String jmsProvider = 
DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_jms_provider);
-
        private String jobId = null;
-       private DuccWorkJob job = null;
        
        private boolean info = true;
+       private boolean error = true;
        private boolean debug = false;
-       private boolean cancel_job_on_interrupt = false;
        
-       private int milliseconds = 1;
-       private int seconds              = 1000*milliseconds;
-       private int wakeupTime   = 60*seconds;
-
-       private int MAXLINES = 2000;
+       private boolean timestamp = false;
+       
+       private int milliseconds        = 1;
+       private int seconds                     = 1000*milliseconds;
+       private int wakeupInterval      = 15*seconds;
+       
+       private int urlTimeout = 60*seconds;
        
        private IDuccMessageProcessor duccMessageProcessor = new DuccMessage();
        
+       DuccPropertiesResolver duccPropertiesResolver;
+
        private void debug(String message) {
                if(debug) {
-                       duccMessageProcessor.out(message);
+                       duccMessageProcessor.out(timestamp(message));
                }
        }
        
@@ -115,10 +97,25 @@ public class DuccJobMonitor extends Ducc
        
        private void info(String message) {
                if(info) {
-                       duccMessageProcessor.out(message);
+                       duccMessageProcessor.out(timestamp(message));
                }
        }
        
+       private void error(String message) {
+               if(error) {
+                       duccMessageProcessor.out(timestamp(message));
+               }
+       }
+       
+       private String timestamp(String message) {
+               String tMessage = message;
+               if(timestamp) {
+                       String date = new 
java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(new java.util.Date());
+                       tMessage = date+" "+message;
+               }
+               return tMessage;
+       }
+       
        public DuccJobMonitor() {
        }
        
@@ -154,153 +151,87 @@ public class DuccJobMonitor extends Ducc
                                
.withLongOpt(DuccUiConstants.name_service_endpoint).create());
        }
        
-       private void start() {
-               context = new DefaultCamelContext();
-               amqc = ActiveMQComponent.activeMQComponent(broker);
-        context.addComponent(jmsProvider, amqc);
-        try {
-                       
context.addRoutes(this.routeBuilderForIncomingRequests(endpoint, this));
-               } catch (Exception e) {
-                       duccMessageProcessor.exception(e);
-               }
-               try {
-                       context.start();
-               } catch (Exception e) {
-                       duccMessageProcessor.exception(e);
-               }
+       protected void help(Options options) {
+               HelpFormatter formatter = new HelpFormatter();
+               formatter.setWidth(DuccUiConstants.help_width);
+               formatter.printHelp(DuccJobMonitor.class.getName(), options);
+               rc.set(RC_HELP);
+               return;
        }
        
-       private void stop() {
-               while(!stopped.get()) {
-                       boolean success = stopped.compareAndSet(false, true);
-                       if(success) {
-                               try {
-                                       CamelUtil.stop(context);
-                               } catch (Exception e) {
-                                       duccMessageProcessor.exception(e);
-                               }
-                       }
+       private String getSingleLineStatus(String urlString) {
+               String line = null;
+               URL url = null;
+               try {
+                   url = new URL(urlString);
+                   URLConnection uc = url.openConnection();
+                   uc.setReadTimeout(urlTimeout); 
+                   BufferedReader br = new BufferedReader(new 
InputStreamReader(uc.getInputStream()));
+                   line = br.readLine();
+                   br.close();
+               } 
+               catch (MalformedURLException e) {
+                   e.printStackTrace();
                }
+               catch(IOException e) {
+                       e.printStackTrace();
+               } 
+               return line;
        }
        
-       public synchronized RouteBuilder routeBuilderForIncomingRequests(final 
String endpoint, final DuccJobMonitor delegate) {
-        return new RouteBuilder() {
-            public void configure() {
-               //System.out.println("..... Defining Router on 
endpoint:"+endpoint);
-               from(endpoint)
-               //.unmarshal().xstream()
-               .process(new MonitorProcessor())
-               .bean(delegate);
-            }
-        };
-       }
-       
-       public class MonitorProcessor implements Processor {
-               public void process( Exchange ex ) {
-                       //System.out.println("..... Monitor received an event 
....");
+       private String getUrlString(String id) {
+               String host = 
duccPropertiesResolver.getFileProperty("ducc.ws.node");
+               if(host == null) {
+                       host = 
duccPropertiesResolver.getFileProperty("ducc.head");
                }
+               String port = 
duccPropertiesResolver.getFileProperty("ducc.ws.port");
+               String urlString = 
"http://"+host+":"+port+"/ducc-servlet/proxy-job-status?id="+id;
+               debug(urlString);
+               return urlString;
        }
        
-       boolean isWorkCompleted(String v1, String v2) {
-               boolean retVal = false;
+       private void adjustWakeupInterval() {
+               String rate = 
duccPropertiesResolver.getFileProperty("ducc.orchestrator.state.publish.rate");
                try {
-                       int intValue1 = Integer.parseInt(v1);
-                       int intValue2 = Integer.parseInt(v2);
-                       if(intValue1 >= 0) {
-                               if(intValue1 == intValue2) {
-                                       retVal = true;
-                               }
-                       }
+                       wakeupInterval = Integer.parseInt(rate);
                }
                catch(Exception e) {
-               }
-               return retVal;
-       }
-       
-       public void onOrchestratorStateDuccEvent(@Body 
OrchestratorStateDuccEvent duccEvent) throws Exception {
-               synchronized(jobActive) {
-                       if(jobActive.get()) {
-                               
debug("JobCount:"+duccEvent.getWorkMap().getJobCount());
-                               job = (DuccWorkJob) 
duccEvent.getWorkMap().findDuccWork(DuccType.Job, jobId);
-                               if(job == null) {
-                                       StringBuffer message = new 
StringBuffer();
-                                       message.append("id:"+jobId);
-                                       message.append(" ");
-                                       message.append("state:"+NotFound);
-                                       info(message.toString());
-                                       jobActive.set(false);
-                               }
-                               else {
-                                       JobState jobState = (JobState) 
job.getStateObject();
-                                       String total = 
job.getSchedulingInfo().getWorkItemsTotal();
-                                       String completed = 
job.getSchedulingInfo().getWorkItemsCompleted();
-                                       String error = 
job.getSchedulingInfo().getWorkItemsError();
-                                       String retry = 
job.getSchedulingInfo().getWorkItemsRetry();
-                                       StringBuffer messageBuffer = new 
StringBuffer();
-                                       String message = 
messageBuffer.toString();
-                                       messageBuffer.append("id:"+jobId);
-                                       messageBuffer.append(" ");
-                                       messageBuffer.append("state:"+jobState);
-                                       switch(jobState) {
-                                       case Completed:
-                                               int count = 
job.getProcessMap().getAliveProcessCount();
-                                               if(count > 0) {
-                                                       messageBuffer.append(" 
");
-                                                       
messageBuffer.append("processes stopping:"+count);
-                                               }
-                                               else {
-                                                       jobActive.set(false);
-                                                       
if(isWorkCompleted(total,completed)) {
-                                                               
rc.set(RC_SUCCESS);
-                                                       }
-                                                       else {
-                                                               
rc.set(RC_FAILURE);
-                                                       }
-                                               }
-                                       case Running:
-                                       case Completing:
-                                               messageBuffer.append(" ");
-                                               
messageBuffer.append("total:"+total);
-                                               messageBuffer.append(" ");
-                                               
messageBuffer.append("done:"+completed);
-                                               messageBuffer.append(" ");
-                                               
messageBuffer.append("error:"+error);
-                                               messageBuffer.append(" ");
-                                               
messageBuffer.append("retry:"+retry);
-                                               break;
-                                       }
-                                       message = messageBuffer.toString();
-                                       synchronized(lastMessage) {
-                                               
if(!message.equals(lastMessage)) {
-                                                       
info(message.toString());
-                                                       lastMessage = message;
-                                               }
-                                       }
-                               }
-                               if(!jobActive.get()) {
-                                       main.interrupt();
-                               }       
-                       }
-                       else {
-                               debug("OR publication ignored...job not 
active");
-                       }
+                       debug(e);
                }
        }
        
-       @Override
-       public void setDuccEventDispatcher(DuccEventDispatcher eventDispatcher) 
{
-               throw new RuntimeException();
-       }
-       
-       protected int help(Options options) {
-               HelpFormatter formatter = new HelpFormatter();
-               formatter.setWidth(DuccUiConstants.help_width);
-               formatter.printHelp(DuccJobMonitor.class.getName(), options);
-               return 1;
+       private String details(MonitorInfo monitorInfo) {
+               StringBuffer sb = new StringBuffer();
+               sb.append(" ");
+               sb.append("total:");
+               sb.append(monitorInfo.total);
+               sb.append(" ");
+               sb.append("done:");
+               sb.append(monitorInfo.done);
+               sb.append(" ");
+               sb.append("error:");
+               sb.append(monitorInfo.error);
+               sb.append(" ");
+               sb.append("retry:");
+               sb.append(monitorInfo.retry);
+               sb.append(" ");
+               sb.append("procs:");
+               sb.append(monitorInfo.procs);
+               return sb.toString();
        }
        
        public int run(String[] args) throws Exception {
                /*
+                * require DUCC_HOME 
+                */
+               String ducc_home_key = "DUCC_HOME";
+               String ducc_home = System.getenv(ducc_home_key);
+               if(ducc_home == null) {
+                       duccMessageProcessor.err("missing required environment 
variable: "+ducc_home_key);
+                       rc.set(RC_FAILURE);
+                       return rc.get();
+               }
+               /*
                 * parser is not thread safe?
                 */
                synchronized(DuccUi.class) {
@@ -312,19 +243,12 @@ public class DuccJobMonitor extends Ducc
                         * give help & exit when requested
                         */
                        if (commandLine.hasOption(DuccUiConstants.name_help)) {
-                               return help(options);
+                               help(options);
+                               return rc.get();
                        }
                        if(commandLine.getOptions().length == 0) {
-                               return help(options);
-                       }
-                       /*
-                        * require DUCC_HOME 
-                        */
-                       String ducc_home_key = "DUCC_HOME";
-                       String ducc_home = System.getenv(ducc_home_key);
-                       if(ducc_home == null) {
-                               duccMessageProcessor.err("missing required 
environment variable: "+ducc_home_key);
-                               return 1;
+                               help(options);
+                               return rc.get();
                        }
                        /*
                         * detect duplicate options
@@ -336,20 +260,19 @@ public class DuccJobMonitor extends Ducc
                         * timestamp
                         */
                        if 
(commandLine.hasOption(DuccUiConstants.name_timestamp)) {
-                               //logger = logger_ts;
+                               timestamp = true;
                        }
                        /*
                         * verbosity
                         */
                        if (commandLine.hasOption(DuccUiConstants.name_debug)) {
-                               //logger.setLevel(Level.DEBUG);
                                debug = true;
                        }
                        /*
                         * cancel job enabled
                         */
                        if 
(commandLine.hasOption(DuccUiConstants.name_monitor_cancel_job_on_interrupt)) {
-                               cancel_job_on_interrupt = true;
+                               cancel_job_on_interrupt.set(true);
                        }
                        /*
                         * job id
@@ -364,130 +287,117 @@ public class DuccJobMonitor extends Ducc
                                rc.set(RC_HELP);
                                return rc.get();
                        }
-                       /*
-                        * broker & endpoint
-                        */
-                       if 
(commandLine.hasOption(DuccUiConstants.name_service_broker)) {
-                               broker = 
commandLine.getOptionValue(DuccUiConstants.name_service_broker);
-                       }
-                       if 
(commandLine.hasOption(DuccUiConstants.name_service_endpoint)) {
-                               endpoint = 
commandLine.getOptionValue(DuccUiConstants.name_service_endpoint);
-                       }
                }
-               /* 
-                * echo
-                */
-               debug("jmsProvider="+jmsProvider);
-               debug("broker="+broker);
-               debug("endpoint="+endpoint);
-               debug("id="+jobId);
                
                main = Thread.currentThread();
                
                Thread killer = new Killer(main);
                Runtime.getRuntime().addShutdownHook(killer);
                
-               start();
-               
-               StringBuffer msgName = new StringBuffer();
-               msgName.append("id:"+jobId);
-               msgName.append(" ");
-               msgName.append("location:");
-               msgName.append(ManagementFactory.getRuntimeMXBean().getName());
-               info(msgName.toString());
+               duccPropertiesResolver = DuccPropertiesResolver.getInstance();
                
-               debug("monitor start");
-               while(jobActive.get()) {
-                       try {
-                               Thread.sleep(wakeupTime);
-                       } catch (InterruptedException e) {
-                               debug(e);
-                       }
-                       debug("monitor active...");
-               }
-               debug("monitor stop");
-               stop();
+               adjustWakeupInterval();
                
-               Runtime.getRuntime().removeShutdownHook(killer);
-               
-               showErrors();
-               
-               StringBuffer msgRc = new StringBuffer();
-               msgRc.append("id:"+jobId);
-               msgRc.append(" ");
-               msgRc.append("rc:"+rc.get());
-               info(msgRc.toString());
-               
-               return rc.get();
-       }
-       
-       private void dumpFile(String fileName) {
-               try {
-                       String prefix = "id:"+jobId+" ";
-                       String data = "file:"+fileName;
-                       info(prefix+data);
-                       FileInputStream fis = new FileInputStream(fileName);
-                       DataInputStream dis = new DataInputStream(fis);
-                       BufferedReader br = new BufferedReader(new 
InputStreamReader(dis));
-                       int maxErrors = 1;
-                       int maxLines = MAXLINES;
-                       int lines = 0;
-                       int errors = 0;
-                       while(true) {
-                               String line = br.readLine();
-                               lines ++;
-                               if(line == null) {
-                                       break;
+               boolean observer = true;
+               String urlString = getUrlString(jobId);
+               String lastMessage = "";
+               String thisMessage = "";
+               
+               StringBuffer message = new StringBuffer();
+               
+               message.append("id:"+jobId);
+               message.append(" ");
+               message.append("location:");
+               message.append(ManagementFactory.getRuntimeMXBean().getName());
+               info(message.toString());
+               
+               while(observer) {
+                       
+                       String json = getSingleLineStatus(urlString);
+                       debug(json);
+                       
+                       if(json != null) {
+                               Gson gson = new Gson();
+                               MonitorInfo monitorInfo = gson.fromJson(json, 
MonitorInfo.class);
+
+                               int stateCount = 
monitorInfo.stateSequence.size();
+                               debug("states:"+stateCount);
+                               if(stateCount <= 0) {
+                                       message = new StringBuffer();
+                                       message.append("id:"+jobId);
+                                       message.append(" ");
+                                       message.append("state:"+NotFound);
+                                       thisMessage = message.toString();
+                                       info(thisMessage);
+                                       return rc.get();
                                }
-                               String tline = line.trim();
-                               if(tline.length() == 0) {
-                                       continue;
+                               
+                               String state = "";
+                               Iterator<String> states = 
monitorInfo.stateSequence.iterator();
+                               while(states.hasNext()) {
+                                       state = states.next();
                                }
-                               String[] tokens = tline.split(" ");
-                               if(tokens.length > 5) {
-                                       if(tokens[5].trim().equals("ERROR")) {
-                                               errors++;
-                                       }
+                               
+                               message = new StringBuffer();
+                               message.append("id:"+jobId);
+                               message.append(" ");
+                               message.append("state:"+state);
+                               
+                               if(state.equals(StateRunning)) {
+                                       message.append(details(monitorInfo));
+                               }
+                               else if(state.equals(StateCompleting)) {
+                                       cancel_job_on_interrupt.set(false);
+                                       message.append(details(monitorInfo));
                                }
-                               info(prefix+line);
-                               if(lines > maxLines) {
-                                       info(prefix+"more...");
-                                       break;
+                               else if(state.equals(StateCompleted)) {
+                                       cancel_job_on_interrupt.set(false);
+                                       message.append(details(monitorInfo));
                                }
-                               if(errors > maxErrors) {
-                                       info(prefix+"more...");
-                                       break;
+                               
+                               thisMessage = message.toString();
+                               if(!thisMessage.equals(lastMessage)) {
+                                       info(thisMessage);
+                                       lastMessage = thisMessage;
+                               }
+                               
+                               if(state.equals(StateCompleted)) {
+                                       if(monitorInfo.procs.equals("0")) {
+                                               
if(monitorInfo.total.equals(monitorInfo.done)) {
+                                                       message = new 
StringBuffer();
+                                                       
message.append("id:"+jobId);
+                                                       message.append(" ");
+                                                       
message.append("rc:"+RC_SUCCESS);
+                                                       thisMessage = 
message.toString();
+                                                       info(thisMessage);
+                                                       rc.set(RC_SUCCESS);
+                                                       return rc.get();
+                                               }
+                                               else {
+                                                       message = new 
StringBuffer();
+                                                       
message.append("id:"+jobId);
+                                                       message.append(" ");
+                                                       
message.append("rc:"+RC_FAILURE);
+                                                       thisMessage = 
message.toString();
+                                                       info(thisMessage);
+                                                       rc.set(RC_FAILURE);
+                                                       return rc.get();
+                                               }
+                                       }
                                }
                        }
-                       br.close();
-                       dis.close();
-                       fis.close();
-               }
-               catch(Exception e) {
-                       debug(e);
-               }
-       }
-       
-       private void showErrors() {
-               if(job != null) {
-                       StringBuffer sb = new StringBuffer();
-                       sb.append(job.getLogDirectory());
-                       if(!job.getLogDirectory().endsWith(File.separator)) {
-                               sb.append(File.separator);
+                       else {
+                               error("error: timeout accessing "+urlString);
                        }
-                       sb.append(job.getDuccId().getFriendly()+File.separator);
-                       sb.append("jd.err.log");
+
                        try {
-                               String fileName = sb.toString();
-                               File file = new File(fileName);
-                               if(file.canRead()) {
-                                       dumpFile(fileName);
-                               }
-                       }
-                       catch(Exception e) {
+                               Thread.sleep(wakeupInterval);
+                       } catch (InterruptedException e) {
                                debug(e);
                        }
                }
+               
+               return rc.get();
        }
        
        private class Killer extends Thread {
@@ -497,7 +407,7 @@ public class DuccJobMonitor extends Ducc
                
                public void run() {
                        StringBuffer message = new StringBuffer();
-                       if(cancel_job_on_interrupt) {
+                       if(cancel_job_on_interrupt.get()) {
                                message.append("killer: cancel");
                                cancel();
                        }
@@ -513,8 +423,8 @@ public class DuccJobMonitor extends Ducc
                        ArrayList<String> arrayList = new ArrayList<String>();
                        arrayList.add("--"+DuccUiConstants.name_job_id);
                        arrayList.add(jobId);
-                       arrayList.add("--"+DuccUiConstants.name_service_broker);
-                       arrayList.add(broker);
+                       arrayList.add("--"+DuccUiConstants.name_reason);
+                       arrayList.add("\"submitter was terminated via 
interrupt\"");
                        String[] argList = arrayList.toArray(new String[0]);
                DuccJobCancel duccJobCancel = new DuccJobCancel();
                int retVal = duccJobCancel.run(argList);
@@ -526,7 +436,6 @@ public class DuccJobMonitor extends Ducc
                duccMessageProcessor.exception(e);
        }
        }
-
        public static void main(String[] args) {
                try {
                        DuccJobMonitor duccJobMonitor = new DuccJobMonitor();


Reply via email to