Author: degenaro Date: Wed Feb 6 19:35:53 2013 New Revision: 1443158 URL: http://svn.apache.org/viewvc?rev=1443158&view=rev Log: UIMA-2642 Replace Camel listening mechanism with WS polling mechanism for Job status monitoring
Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java Modified: uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java URL: http://svn.apache.org/viewvc/uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java?rev=1443158&r1=1443157&r2=1443158&view=diff ============================================================================== --- uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java (original) +++ uima/sandbox/uima-ducc/trunk/uima-ducc-cli/src/main/java/org/apache/uima/ducc/cli/DuccJobMonitor.java Wed Feb 6 19:35:53 2013 @@ -19,22 +19,17 @@ package org.apache.uima.ducc.cli; import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; +import java.io.IOException; import java.io.InputStreamReader; import java.lang.management.ManagementFactory; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLConnection; import java.util.ArrayList; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.camel.component.ActiveMQComponent; -import org.apache.camel.Body; -import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.impl.DefaultCamelContext; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -43,20 +38,17 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.uima.ducc.api.DuccMessage; import org.apache.uima.ducc.api.IDuccMessageProcessor; +import org.apache.uima.ducc.common.json.MonitorInfo; import org.apache.uima.ducc.common.utils.DuccPropertiesResolver; -import org.apache.uima.ducc.transport.dispatcher.DuccEventDispatcher; -import org.apache.uima.ducc.transport.event.OrchestratorStateDuccEvent; -import org.apache.uima.ducc.transport.event.common.DuccWorkJob; -import org.apache.uima.ducc.transport.event.common.IDuccState.JobState; -import org.apache.uima.ducc.transport.event.common.IDuccTypes.DuccType; -import org.apache.uima.ducc.transport.event.delegate.DuccEventDelegateListener; +import com.google.gson.Gson; /** * Monitor a DUCC job */ -public class DuccJobMonitor extends DuccUi implements DuccEventDelegateListener { +public class DuccJobMonitor extends DuccUi { +private Thread main = null; private static final int RC_UNKNOWN = -1; private static final int RC_SUCCESS = 0; @@ -66,44 +58,34 @@ public class DuccJobMonitor extends Ducc private static final String NotFound = "NotFound"; - private AtomicBoolean stopped = new AtomicBoolean(false); - private AtomicBoolean jobActive = new AtomicBoolean(true); - private AtomicInteger rc = new AtomicInteger(RC_UNKNOWN); - - private String lastMessage = ""; + private static final String StateRunning = "Running"; + private static final String StateCompleting = "Completing"; + private static final String StateCompleted = "Completed"; - private Thread main = null; - - private CamelContext context; - private ActiveMQComponent amqc; + private AtomicInteger rc = new AtomicInteger(RC_UNKNOWN); + private AtomicBoolean cancel_job_on_interrupt = new AtomicBoolean(false); - private String broker = DuccUiUtilities.buildBrokerUrl(); - private String endpoint = DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_jms_provider) - + ":" - + DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_orchestrator_state_update_endpoint_type) - + ":" - + DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_orchestrator_state_update_endpoint) - ; - private String jmsProvider = DuccPropertiesResolver.getInstance().getProperty(DuccPropertiesResolver.ducc_jms_provider); - private String jobId = null; - private DuccWorkJob job = null; private boolean info = true; + private boolean error = true; private boolean debug = false; - private boolean cancel_job_on_interrupt = false; - private int milliseconds = 1; - private int seconds = 1000*milliseconds; - private int wakeupTime = 60*seconds; - - private int MAXLINES = 2000; + private boolean timestamp = false; + + private int milliseconds = 1; + private int seconds = 1000*milliseconds; + private int wakeupInterval = 15*seconds; + + private int urlTimeout = 60*seconds; private IDuccMessageProcessor duccMessageProcessor = new DuccMessage(); + DuccPropertiesResolver duccPropertiesResolver; + private void debug(String message) { if(debug) { - duccMessageProcessor.out(message); + duccMessageProcessor.out(timestamp(message)); } } @@ -115,10 +97,25 @@ public class DuccJobMonitor extends Ducc private void info(String message) { if(info) { - duccMessageProcessor.out(message); + duccMessageProcessor.out(timestamp(message)); } } + private void error(String message) { + if(error) { + duccMessageProcessor.out(timestamp(message)); + } + } + + private String timestamp(String message) { + String tMessage = message; + if(timestamp) { + String date = new java.text.SimpleDateFormat("dd/MM/yyyy HH:mm:ss").format(new java.util.Date()); + tMessage = date+" "+message; + } + return tMessage; + } + public DuccJobMonitor() { } @@ -154,153 +151,87 @@ public class DuccJobMonitor extends Ducc .withLongOpt(DuccUiConstants.name_service_endpoint).create()); } - private void start() { - context = new DefaultCamelContext(); - amqc = ActiveMQComponent.activeMQComponent(broker); - context.addComponent(jmsProvider, amqc); - try { - context.addRoutes(this.routeBuilderForIncomingRequests(endpoint, this)); - } catch (Exception e) { - duccMessageProcessor.exception(e); - } - try { - context.start(); - } catch (Exception e) { - duccMessageProcessor.exception(e); - } + protected void help(Options options) { + HelpFormatter formatter = new HelpFormatter(); + formatter.setWidth(DuccUiConstants.help_width); + formatter.printHelp(DuccJobMonitor.class.getName(), options); + rc.set(RC_HELP); + return; } - private void stop() { - while(!stopped.get()) { - boolean success = stopped.compareAndSet(false, true); - if(success) { - try { - CamelUtil.stop(context); - } catch (Exception e) { - duccMessageProcessor.exception(e); - } - } + private String getSingleLineStatus(String urlString) { + String line = null; + URL url = null; + try { + url = new URL(urlString); + URLConnection uc = url.openConnection(); + uc.setReadTimeout(urlTimeout); + BufferedReader br = new BufferedReader(new InputStreamReader(uc.getInputStream())); + line = br.readLine(); + br.close(); + } + catch (MalformedURLException e) { + e.printStackTrace(); } + catch(IOException e) { + e.printStackTrace(); + } + return line; } - public synchronized RouteBuilder routeBuilderForIncomingRequests(final String endpoint, final DuccJobMonitor delegate) { - return new RouteBuilder() { - public void configure() { - //System.out.println("..... Defining Router on endpoint:"+endpoint); - from(endpoint) - //.unmarshal().xstream() - .process(new MonitorProcessor()) - .bean(delegate); - } - }; - } - - public class MonitorProcessor implements Processor { - public void process( Exchange ex ) { - //System.out.println("..... Monitor received an event ...."); + private String getUrlString(String id) { + String host = duccPropertiesResolver.getFileProperty("ducc.ws.node"); + if(host == null) { + host = duccPropertiesResolver.getFileProperty("ducc.head"); } + String port = duccPropertiesResolver.getFileProperty("ducc.ws.port"); + String urlString = "http://"+host+":"+port+"/ducc-servlet/proxy-job-status?id="+id; + debug(urlString); + return urlString; } - boolean isWorkCompleted(String v1, String v2) { - boolean retVal = false; + private void adjustWakeupInterval() { + String rate = duccPropertiesResolver.getFileProperty("ducc.orchestrator.state.publish.rate"); try { - int intValue1 = Integer.parseInt(v1); - int intValue2 = Integer.parseInt(v2); - if(intValue1 >= 0) { - if(intValue1 == intValue2) { - retVal = true; - } - } + wakeupInterval = Integer.parseInt(rate); } catch(Exception e) { - } - return retVal; - } - - public void onOrchestratorStateDuccEvent(@Body OrchestratorStateDuccEvent duccEvent) throws Exception { - synchronized(jobActive) { - if(jobActive.get()) { - debug("JobCount:"+duccEvent.getWorkMap().getJobCount()); - job = (DuccWorkJob) duccEvent.getWorkMap().findDuccWork(DuccType.Job, jobId); - if(job == null) { - StringBuffer message = new StringBuffer(); - message.append("id:"+jobId); - message.append(" "); - message.append("state:"+NotFound); - info(message.toString()); - jobActive.set(false); - } - else { - JobState jobState = (JobState) job.getStateObject(); - String total = job.getSchedulingInfo().getWorkItemsTotal(); - String completed = job.getSchedulingInfo().getWorkItemsCompleted(); - String error = job.getSchedulingInfo().getWorkItemsError(); - String retry = job.getSchedulingInfo().getWorkItemsRetry(); - StringBuffer messageBuffer = new StringBuffer(); - String message = messageBuffer.toString(); - messageBuffer.append("id:"+jobId); - messageBuffer.append(" "); - messageBuffer.append("state:"+jobState); - switch(jobState) { - case Completed: - int count = job.getProcessMap().getAliveProcessCount(); - if(count > 0) { - messageBuffer.append(" "); - messageBuffer.append("processes stopping:"+count); - } - else { - jobActive.set(false); - if(isWorkCompleted(total,completed)) { - rc.set(RC_SUCCESS); - } - else { - rc.set(RC_FAILURE); - } - } - case Running: - case Completing: - messageBuffer.append(" "); - messageBuffer.append("total:"+total); - messageBuffer.append(" "); - messageBuffer.append("done:"+completed); - messageBuffer.append(" "); - messageBuffer.append("error:"+error); - messageBuffer.append(" "); - messageBuffer.append("retry:"+retry); - break; - } - message = messageBuffer.toString(); - synchronized(lastMessage) { - if(!message.equals(lastMessage)) { - info(message.toString()); - lastMessage = message; - } - } - } - if(!jobActive.get()) { - main.interrupt(); - } - } - else { - debug("OR publication ignored...job not active"); - } + debug(e); } } - @Override - public void setDuccEventDispatcher(DuccEventDispatcher eventDispatcher) { - throw new RuntimeException(); - } - - protected int help(Options options) { - HelpFormatter formatter = new HelpFormatter(); - formatter.setWidth(DuccUiConstants.help_width); - formatter.printHelp(DuccJobMonitor.class.getName(), options); - return 1; + private String details(MonitorInfo monitorInfo) { + StringBuffer sb = new StringBuffer(); + sb.append(" "); + sb.append("total:"); + sb.append(monitorInfo.total); + sb.append(" "); + sb.append("done:"); + sb.append(monitorInfo.done); + sb.append(" "); + sb.append("error:"); + sb.append(monitorInfo.error); + sb.append(" "); + sb.append("retry:"); + sb.append(monitorInfo.retry); + sb.append(" "); + sb.append("procs:"); + sb.append(monitorInfo.procs); + return sb.toString(); } public int run(String[] args) throws Exception { /* + * require DUCC_HOME + */ + String ducc_home_key = "DUCC_HOME"; + String ducc_home = System.getenv(ducc_home_key); + if(ducc_home == null) { + duccMessageProcessor.err("missing required environment variable: "+ducc_home_key); + rc.set(RC_FAILURE); + return rc.get(); + } + /* * parser is not thread safe? */ synchronized(DuccUi.class) { @@ -312,19 +243,12 @@ public class DuccJobMonitor extends Ducc * give help & exit when requested */ if (commandLine.hasOption(DuccUiConstants.name_help)) { - return help(options); + help(options); + return rc.get(); } if(commandLine.getOptions().length == 0) { - return help(options); - } - /* - * require DUCC_HOME - */ - String ducc_home_key = "DUCC_HOME"; - String ducc_home = System.getenv(ducc_home_key); - if(ducc_home == null) { - duccMessageProcessor.err("missing required environment variable: "+ducc_home_key); - return 1; + help(options); + return rc.get(); } /* * detect duplicate options @@ -336,20 +260,19 @@ public class DuccJobMonitor extends Ducc * timestamp */ if (commandLine.hasOption(DuccUiConstants.name_timestamp)) { - //logger = logger_ts; + timestamp = true; } /* * verbosity */ if (commandLine.hasOption(DuccUiConstants.name_debug)) { - //logger.setLevel(Level.DEBUG); debug = true; } /* * cancel job enabled */ if (commandLine.hasOption(DuccUiConstants.name_monitor_cancel_job_on_interrupt)) { - cancel_job_on_interrupt = true; + cancel_job_on_interrupt.set(true); } /* * job id @@ -364,130 +287,117 @@ public class DuccJobMonitor extends Ducc rc.set(RC_HELP); return rc.get(); } - /* - * broker & endpoint - */ - if (commandLine.hasOption(DuccUiConstants.name_service_broker)) { - broker = commandLine.getOptionValue(DuccUiConstants.name_service_broker); - } - if (commandLine.hasOption(DuccUiConstants.name_service_endpoint)) { - endpoint = commandLine.getOptionValue(DuccUiConstants.name_service_endpoint); - } } - /* - * echo - */ - debug("jmsProvider="+jmsProvider); - debug("broker="+broker); - debug("endpoint="+endpoint); - debug("id="+jobId); main = Thread.currentThread(); Thread killer = new Killer(main); Runtime.getRuntime().addShutdownHook(killer); - start(); - - StringBuffer msgName = new StringBuffer(); - msgName.append("id:"+jobId); - msgName.append(" "); - msgName.append("location:"); - msgName.append(ManagementFactory.getRuntimeMXBean().getName()); - info(msgName.toString()); + duccPropertiesResolver = DuccPropertiesResolver.getInstance(); - debug("monitor start"); - while(jobActive.get()) { - try { - Thread.sleep(wakeupTime); - } catch (InterruptedException e) { - debug(e); - } - debug("monitor active..."); - } - debug("monitor stop"); - stop(); + adjustWakeupInterval(); - Runtime.getRuntime().removeShutdownHook(killer); - - showErrors(); - - StringBuffer msgRc = new StringBuffer(); - msgRc.append("id:"+jobId); - msgRc.append(" "); - msgRc.append("rc:"+rc.get()); - info(msgRc.toString()); - - return rc.get(); - } - - private void dumpFile(String fileName) { - try { - String prefix = "id:"+jobId+" "; - String data = "file:"+fileName; - info(prefix+data); - FileInputStream fis = new FileInputStream(fileName); - DataInputStream dis = new DataInputStream(fis); - BufferedReader br = new BufferedReader(new InputStreamReader(dis)); - int maxErrors = 1; - int maxLines = MAXLINES; - int lines = 0; - int errors = 0; - while(true) { - String line = br.readLine(); - lines ++; - if(line == null) { - break; + boolean observer = true; + String urlString = getUrlString(jobId); + String lastMessage = ""; + String thisMessage = ""; + + StringBuffer message = new StringBuffer(); + + message.append("id:"+jobId); + message.append(" "); + message.append("location:"); + message.append(ManagementFactory.getRuntimeMXBean().getName()); + info(message.toString()); + + while(observer) { + + String json = getSingleLineStatus(urlString); + debug(json); + + if(json != null) { + Gson gson = new Gson(); + MonitorInfo monitorInfo = gson.fromJson(json, MonitorInfo.class); + + int stateCount = monitorInfo.stateSequence.size(); + debug("states:"+stateCount); + if(stateCount <= 0) { + message = new StringBuffer(); + message.append("id:"+jobId); + message.append(" "); + message.append("state:"+NotFound); + thisMessage = message.toString(); + info(thisMessage); + return rc.get(); } - String tline = line.trim(); - if(tline.length() == 0) { - continue; + + String state = ""; + Iterator<String> states = monitorInfo.stateSequence.iterator(); + while(states.hasNext()) { + state = states.next(); } - String[] tokens = tline.split(" "); - if(tokens.length > 5) { - if(tokens[5].trim().equals("ERROR")) { - errors++; - } + + message = new StringBuffer(); + message.append("id:"+jobId); + message.append(" "); + message.append("state:"+state); + + if(state.equals(StateRunning)) { + message.append(details(monitorInfo)); + } + else if(state.equals(StateCompleting)) { + cancel_job_on_interrupt.set(false); + message.append(details(monitorInfo)); } - info(prefix+line); - if(lines > maxLines) { - info(prefix+"more..."); - break; + else if(state.equals(StateCompleted)) { + cancel_job_on_interrupt.set(false); + message.append(details(monitorInfo)); } - if(errors > maxErrors) { - info(prefix+"more..."); - break; + + thisMessage = message.toString(); + if(!thisMessage.equals(lastMessage)) { + info(thisMessage); + lastMessage = thisMessage; + } + + if(state.equals(StateCompleted)) { + if(monitorInfo.procs.equals("0")) { + if(monitorInfo.total.equals(monitorInfo.done)) { + message = new StringBuffer(); + message.append("id:"+jobId); + message.append(" "); + message.append("rc:"+RC_SUCCESS); + thisMessage = message.toString(); + info(thisMessage); + rc.set(RC_SUCCESS); + return rc.get(); + } + else { + message = new StringBuffer(); + message.append("id:"+jobId); + message.append(" "); + message.append("rc:"+RC_FAILURE); + thisMessage = message.toString(); + info(thisMessage); + rc.set(RC_FAILURE); + return rc.get(); + } + } } } - br.close(); - dis.close(); - fis.close(); - } - catch(Exception e) { - debug(e); - } - } - - private void showErrors() { - if(job != null) { - StringBuffer sb = new StringBuffer(); - sb.append(job.getLogDirectory()); - if(!job.getLogDirectory().endsWith(File.separator)) { - sb.append(File.separator); + else { + error("error: timeout accessing "+urlString); } - sb.append(job.getDuccId().getFriendly()+File.separator); - sb.append("jd.err.log"); + try { - String fileName = sb.toString(); - File file = new File(fileName); - if(file.canRead()) { - dumpFile(fileName); - } - } - catch(Exception e) { + Thread.sleep(wakeupInterval); + } catch (InterruptedException e) { debug(e); } } + + return rc.get(); } private class Killer extends Thread { @@ -497,7 +407,7 @@ public class DuccJobMonitor extends Ducc public void run() { StringBuffer message = new StringBuffer(); - if(cancel_job_on_interrupt) { + if(cancel_job_on_interrupt.get()) { message.append("killer: cancel"); cancel(); } @@ -513,8 +423,8 @@ public class DuccJobMonitor extends Ducc ArrayList<String> arrayList = new ArrayList<String>(); arrayList.add("--"+DuccUiConstants.name_job_id); arrayList.add(jobId); - arrayList.add("--"+DuccUiConstants.name_service_broker); - arrayList.add(broker); + arrayList.add("--"+DuccUiConstants.name_reason); + arrayList.add("\"submitter was terminated via interrupt\""); String[] argList = arrayList.toArray(new String[0]); DuccJobCancel duccJobCancel = new DuccJobCancel(); int retVal = duccJobCancel.run(argList); @@ -526,7 +436,6 @@ public class DuccJobMonitor extends Ducc duccMessageProcessor.exception(e); } } - public static void main(String[] args) { try { DuccJobMonitor duccJobMonitor = new DuccJobMonitor();