Author: mszefler Date: Thu Jan 25 11:41:41 2007 New Revision: 499933 URL: http://svn.apache.org/viewvc?view=rev&rev=499933 Log: Updated checkRetry logic. If we fail after max retries, reschedule the job for a bit later. Failing that, save it to disk.
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=499933&r1=499932&r2=499933 ============================================================================== --- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java (original) +++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java Thu Jan 25 11:41:41 2007 @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Random; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import javax.wsdl.Operation; @@ -101,7 +102,7 @@ /** Manage instance-level locks. */ private final InstanceLockManager _instanceLockManager = new InstanceLockManager(); - + final Contexts _contexts; public BpelEngineImpl(Contexts contexts) { @@ -249,8 +250,8 @@ WorkEvent we = new WorkEvent(jobInfo.jobDetail); // We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks, - // Note that we don't want to wait too long here to get our lock, since we are likely holding - // on to scheduler's locks of various sorts. + // Note that we don't want to wait too long here to get our lock, since we are likely holding + // on to scheduler's locks of various sorts. try { _instanceLockManager.lock(we.getIID(), 1, TimeUnit.MICROSECONDS); } catch (InterruptedException e) { @@ -260,17 +261,17 @@ } catch (org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) { __log.debug("Instance " + we.getIID() + " is busy, rescheduling job."); // TODO: This should really be more of something like the exponential backoff algorithm in - // ethernet. - _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, - new Date(System.currentTimeMillis() + Math.min(randomExp(1000),10000))); + // ethernet. + _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date(System.currentTimeMillis() + + Math.min(randomExp(1000), 10000))); return; } - // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle - // all types of failure here, the scheduler is not going to know how to handle our errors, - // ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come + // DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle + // all types of failure here, the scheduler is not going to know how to handle our errors, + // ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come // to a grinding halt. try { - + ProcessInstanceDAO instance; if (we.isInMem()) instance = _contexts.inMemDao.getConnection().getInstance(we.getIID()); @@ -301,16 +302,16 @@ process.handleWorkEvent(jobInfo.jobDetail); debuggingDelay(); } catch (BpelEngineException bee) { - __log.error(__msgs.msgScheduledJobFailed(we.getDetail()),bee); - throw new Scheduler.JobProcessorException(bee,checkRetry(jobInfo,bee)); + __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee); + throw new Scheduler.JobProcessorException(bee, checkRetry(jobInfo, bee)); } catch (ContextException ce) { - __log.error(__msgs.msgScheduledJobFailed(we.getDetail()),ce); - throw new Scheduler.JobProcessorException(ce,checkRetry(jobInfo,ce)); + __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce); + throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce)); } catch (RuntimeException rte) { - __log.error(__msgs.msgScheduledJobFailed(we.getDetail()),rte); - throw new Scheduler.JobProcessorException(rte,checkRetry(jobInfo,rte)); + __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte); + throw new Scheduler.JobProcessorException(rte, checkRetry(jobInfo, rte)); } catch (Throwable t) { - __log.error(__msgs.msgScheduledJobFailed(we.getDetail()),t); + __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t); throw new Scheduler.JobProcessorException(false); } finally { @@ -318,22 +319,49 @@ } } - private boolean checkRetry(JobInfo jobInfo, Throwable t) { + private boolean checkRetry(final JobInfo jobInfo, Throwable t) { // TODO, better handling of failed jobs (put them in the DB perhaps?) if (jobInfo.retryCount < MAX_RETRIES) return true; - __log.error("Job could not be completed after " + MAX_RETRIES + "! ",t); - - try { - File f = File.createTempFile("ode-bad-job", ".ser", new File("")); - ObjectOutputStream fos = new ObjectOutputStream(new FileOutputStream(f)); - fos.writeObject(jobInfo); - fos.close(); - } catch (Exception ex) { - __log.error("Could not save bad job; it will be lost!",ex); + __log.error("Job could not be completed after " + MAX_RETRIES + ": " + jobInfo.jobDetail, t); + + boolean saveToDisk = false; + if (jobInfo.jobDetail.get("final") == null) { + __log.error("Rescheduling problematic job for a bit later: " + jobInfo.jobDetail, t); + + try { + _contexts.scheduler.execIsolatedTransaction(new Callable<Void>() { + + public Void call() throws Exception { + jobInfo.jobDetail.put("final", true); + _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, + new Date(System.currentTimeMillis() + 60 * 1000)); + return null; + } + + }); + + } catch (Exception ex) { + __log.error("Error rescheduling problematic job: " + jobInfo.jobDetail,ex); + saveToDisk = true; + } + } else { + saveToDisk = true; } + if (saveToDisk) + try { + File f = File.createTempFile("ode-bad-job", ".ser", new File("")); + ObjectOutputStream fos = new ObjectOutputStream(new FileOutputStream(f)); + fos.writeObject(jobInfo); + fos.close(); + __log.error("Saved problematic job to disk (last resort): " + jobInfo.jobDetail +" in file " + f); + } catch (Exception ex) { + __log.error("Could not save bad job; it will be lost: " + jobInfo.jobDetail, ex); + } + + // No more retries. return false; } @@ -362,6 +390,7 @@ long delay = (long) (-Math.log(u) * mean); // Exponential return delay; } + void fireEvent(BpelEvent event) { // Note that the eventListeners list is a copy-on-write array, so need // to mess with synchronization.