Author: mszefler
Date: Thu Jan 25 11:41:41 2007
New Revision: 499933

URL: http://svn.apache.org/viewvc?view=rev&rev=499933
Log:
Updated checkRetry logic. If we fail after max retries, reschedule the job for 
a bit later. Failing that, save it to disk. 

Modified:
    
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java

Modified: 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=499933&r1=499932&r2=499933
==============================================================================
--- 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
 (original)
+++ 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
 Thu Jan 25 11:41:41 2007
@@ -26,6 +26,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import javax.wsdl.Operation;
@@ -101,7 +102,7 @@
 
     /** Manage instance-level locks. */
     private final InstanceLockManager _instanceLockManager = new 
InstanceLockManager();
-    
+
     final Contexts _contexts;
 
     public BpelEngineImpl(Contexts contexts) {
@@ -249,8 +250,8 @@
         WorkEvent we = new WorkEvent(jobInfo.jobDetail);
 
         // We lock the instance to prevent concurrent transactions and prevent 
unnecessary rollbacks,
-        // Note that we don't want to wait too long here to get our lock, 
since we are likely holding 
-        // on to scheduler's locks of various sorts. 
+        // Note that we don't want to wait too long here to get our lock, 
since we are likely holding
+        // on to scheduler's locks of various sorts.
         try {
             _instanceLockManager.lock(we.getIID(), 1, TimeUnit.MICROSECONDS);
         } catch (InterruptedException e) {
@@ -260,17 +261,17 @@
         } catch 
(org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) {
             __log.debug("Instance " + we.getIID() + " is busy, rescheduling 
job.");
             // TODO: This should really be more of something like the 
exponential backoff algorithm in
-            // ethernet. 
-            _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, 
-                    new Date(System.currentTimeMillis() + 
Math.min(randomExp(1000),10000)));
+            // ethernet.
+            _contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new 
Date(System.currentTimeMillis()
+                    + Math.min(randomExp(1000), 10000)));
             return;
         }
-        // DONT PUT CODE HERE-need this method real tight in a try/catch 
block, we need to handle 
-        // all types of failure here, the scheduler is not going to know how 
to handle our errors, 
-        // ALSO we have to release the lock obtained above (IMPORTANT), lest 
the whole system come 
+        // DONT PUT CODE HERE-need this method real tight in a try/catch 
block, we need to handle
+        // all types of failure here, the scheduler is not going to know how 
to handle our errors,
+        // ALSO we have to release the lock obtained above (IMPORTANT), lest 
the whole system come
         // to a grinding halt.
         try {
-            
+
             ProcessInstanceDAO instance;
             if (we.isInMem())
                 instance = 
_contexts.inMemDao.getConnection().getInstance(we.getIID());
@@ -301,16 +302,16 @@
             process.handleWorkEvent(jobInfo.jobDetail);
             debuggingDelay();
         } catch (BpelEngineException bee) {
-            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()),bee);
-            throw new 
Scheduler.JobProcessorException(bee,checkRetry(jobInfo,bee));
+            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee);
+            throw new Scheduler.JobProcessorException(bee, checkRetry(jobInfo, 
bee));
         } catch (ContextException ce) {
-            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()),ce);
-            throw new 
Scheduler.JobProcessorException(ce,checkRetry(jobInfo,ce));
+            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce);
+            throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, 
ce));
         } catch (RuntimeException rte) {
-            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()),rte);
-            throw new 
Scheduler.JobProcessorException(rte,checkRetry(jobInfo,rte));
+            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte);
+            throw new Scheduler.JobProcessorException(rte, checkRetry(jobInfo, 
rte));
         } catch (Throwable t) {
-            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()),t);
+            __log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t);
             throw new Scheduler.JobProcessorException(false);
 
         } finally {
@@ -318,22 +319,49 @@
         }
     }
 
-    private boolean checkRetry(JobInfo jobInfo, Throwable t) {
+    private boolean checkRetry(final JobInfo jobInfo, Throwable t) {
         // TODO, better handling of failed jobs (put them in the DB perhaps?)
         if (jobInfo.retryCount < MAX_RETRIES)
             return true;
 
-        __log.error("Job could not be completed after " + MAX_RETRIES + "! 
",t);
-        
-        try {
-            File f = File.createTempFile("ode-bad-job", ".ser", new File(""));
-            ObjectOutputStream fos = new ObjectOutputStream(new 
FileOutputStream(f));
-            fos.writeObject(jobInfo);
-            fos.close();
-        } catch (Exception ex) {
-            __log.error("Could not save bad job; it will be lost!",ex);
+        __log.error("Job could not be completed after " + MAX_RETRIES + ": " + 
jobInfo.jobDetail, t);
+
+        boolean saveToDisk = false;
+        if (jobInfo.jobDetail.get("final") == null) {
+            __log.error("Rescheduling problematic job for a bit later: " + 
jobInfo.jobDetail, t);
+
+            try {
+                _contexts.scheduler.execIsolatedTransaction(new 
Callable<Void>() {
+
+                    public Void call() throws Exception {
+                        jobInfo.jobDetail.put("final", true);
+                        
_contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail,
+                                new Date(System.currentTimeMillis() + 60 * 
1000));
+                        return null;
+                    }
+
+                });
+
+            } catch (Exception ex) {
+                __log.error("Error rescheduling problematic job: " + 
jobInfo.jobDetail,ex);
+                saveToDisk = true;
+            }
+        } else {
+            saveToDisk = true;
         }
         
+        if (saveToDisk)  
+            try {
+                File f = File.createTempFile("ode-bad-job", ".ser", new 
File(""));
+                ObjectOutputStream fos = new ObjectOutputStream(new 
FileOutputStream(f));
+                fos.writeObject(jobInfo);
+                fos.close();
+                __log.error("Saved problematic job to disk (last resort): " + 
jobInfo.jobDetail +" in file " + f);
+            } catch (Exception ex) {
+                __log.error("Could not save bad job; it will be lost: " + 
jobInfo.jobDetail, ex);
+            }
+        
+
         // No more retries.
         return false;
     }
@@ -362,6 +390,7 @@
         long delay = (long) (-Math.log(u) * mean); // Exponential
         return delay;
     }
+
     void fireEvent(BpelEvent event) {
         // Note that the eventListeners list is a copy-on-write array, so need
         // to mess with synchronization.


Reply via email to