Author: mriou
Date: Thu Mar 1 12:13:40 2007
New Revision: 513477
URL: http://svn.apache.org/viewvc?view=rev&rev=513477
Log:
Really transient jobs when using volatile scheduling. Used for in-mem processes
so that they don't touch the DB at all anymore.
Modified:
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java
incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/scheduler/TestScheduler.java
Modified:
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java?view=diff&rev=513477&r1=513476&r2=513477
==============================================================================
---
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
(original)
+++
incubator/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
Thu Mar 1 12:13:40 2007
@@ -52,11 +52,9 @@
* regardless of whether the tansaction commits.
*
* @param jobDetail information about the job
- * @param when when the job should run (<code>null</code> means now)
* @return unique (as far as the scheduler is concerned) job identifier
*/
- String scheduleVolatileJob(boolean transacted, Map<String,Object>
jobDetail,
- Date when) throws ContextException;
+ String scheduleVolatileJob(boolean transacted, Map<String,Object>
jobDetail) throws ContextException;
/**
* Make a good effort to cancel the job. If its already running no big
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java?view=diff&rev=513477&r1=513476&r2=513477
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
Thu Mar 1 12:13:40 2007
@@ -268,8 +268,7 @@
throw new Scheduler.JobProcessorException(true);
} catch
(org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) {
__log.debug("Instance " + we.getIID() + " is busy, rescheduling
job.");
- // TODO: This should really be more of something like the
exponential backoff algorithm in
- // ethernet.
+ // TODO: This should really be more of something like the
exponential backoff algorithm in ethernet.
_contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new
Date(System.currentTimeMillis()
+ Math.min(randomExp(1000), 10000)));
return;
@@ -339,17 +338,17 @@
__log.error("Rescheduling problematic job for a bit later: " +
jobInfo, t);
try {
- _contexts.scheduler.execIsolatedTransaction(new
Callable<Void>() {
-
- public Void call() throws Exception {
- jobInfo.jobDetail.put("final", true);
-
_contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail,
- new Date(System.currentTimeMillis() + 60 *
1000));
- return null;
- }
-
- });
-
+ if (jobInfo.jobDetail.get("inmem") != null)
+ _contexts.scheduler.scheduleVolatileJob(true,
jobInfo.jobDetail);
+ else
+ _contexts.scheduler.execIsolatedTransaction(new
Callable<Void>() {
+ public Void call() throws Exception {
+ jobInfo.jobDetail.put("final", true);
+
_contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail,
+ new Date(System.currentTimeMillis() + 60 *
1000));
+ return null;
+ }
+ });
} catch (Exception ex) {
__log.error("Error rescheduling problematic job: " +
jobInfo,ex);
saveToDisk = true;
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=513477&r1=513476&r2=513477
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Thu Mar 1 12:13:40 2007
@@ -659,7 +659,7 @@
we.setCorrelatorId(correlatorId);
we.setCorrelationKey(key);
we.setInMem(_bpelProcess.isInMemory());
- _bpelProcess._engine._contexts.scheduler.scheduleVolatileJob(false,
we.getDetail(), new Date());
+ _bpelProcess._engine._contexts.scheduler.scheduleVolatileJob(true,
we.getDetail());
}
public String invoke(PartnerLinkInstance partnerLink, Operation operation,
Element outgoingMessage,
@@ -825,7 +825,10 @@
we.setIID(_iid);
we.setType(WorkEvent.Type.RESUME);
we.setInMem(_bpelProcess.isInMemory());
-
_bpelProcess._engine._contexts.scheduler.schedulePersistedJob(we.getDetail(),
new Date());
+ if (_bpelProcess.isInMemory())
+
_bpelProcess._engine._contexts.scheduler.scheduleVolatileJob(true,
we.getDetail());
+ else
+
_bpelProcess._engine._contexts.scheduler.schedulePersistedJob(we.getDetail(),
new Date());
} catch (ContextException e) {
__log.error("Failed to schedule resume task.", e);
throw new BpelEngineException(e);
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java?view=diff&rev=513477&r1=513476&r2=513477
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/MyRoleMessageExchangeImpl.java
Thu Mar 1 12:13:40 2007
@@ -135,7 +135,10 @@
_waitingCallbacks.put(getClientId(), callback);
setStatus(Status.ASYNC);
- _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(),
null);
+ if (target.isInMemory())
+ _engine._contexts.scheduler.scheduleVolatileJob(true,
we.getDetail());
+ else
+
_engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
return new ResponseFuture(getClientId());
}
}
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java?view=diff&rev=513477&r1=513476&r2=513477
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/PartnerRoleMessageExchangeImpl.java
Thu Mar 1 12:13:40 2007
@@ -94,7 +94,10 @@
we.setInMem(true);
we.setChannel(getDAO().getChannel());
we.setMexId(getDAO().getMessageExchangeId());
- _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(), null);
+ if (we.isInMem())
+ _engine._contexts.scheduler.scheduleVolatileJob(true,
we.getDetail());
+ else
+ _engine._contexts.scheduler.schedulePersistedJob(we.getDetail(),
null);
}
/**
Modified:
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?view=diff&rev=513477&r1=513476&r2=513477
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
Thu Mar 1 12:13:40 2007
@@ -300,9 +300,9 @@
return jobId;
}
- public String scheduleVolatileJob(boolean transacted,
Map<String,Object> jobDetail, Date when) throws ContextException {
- String jobId = _quartz.scheduleVolatileJob(transacted, jobDetail,
when);
- _nextSchedule = when == null ? System.currentTimeMillis() :
when.getTime();
+ public String scheduleVolatileJob(boolean transacted,
Map<String,Object> jobDetail) throws ContextException {
+ String jobId = _quartz.scheduleVolatileJob(transacted, jobDetail);
+ _nextSchedule = System.currentTimeMillis();
return jobId;
}
Modified:
incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java?view=diff&rev=513477&r1=513476&r2=513477
==============================================================================
---
incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java
(original)
+++
incubator/ode/trunk/bpel-scheduler-quartz/src/main/java/org/apache/ode/bpel/scheduler/quartz/QuartzSchedulerImpl.java
Thu Mar 1 12:13:40 2007
@@ -212,8 +212,29 @@
return jobDetail.getName();
}
- public String scheduleVolatileJob(boolean transacted, Map<String, Object>
detail, Date when) throws ContextException {
- return schedule(detail, when, true, !transacted);
+ public String scheduleVolatileJob(final boolean transacted, final
Map<String, Object> detail) throws ContextException {
+ registerSynchronizer(new Synchronizer() {
+ public void afterCompletion(boolean success) {
+ try {
+ if (transacted) {
+ execIsolatedTransaction(new Callable() {
+ public Object call() throws Exception {
+ JobInfo ji = new JobInfo("volatileJob",
detail, 0);
+ doExecute(ji);
+ return null;
+ }
+ });
+ } else {
+ JobInfo ji = new JobInfo("volatileJob", detail, 0);
+ doExecute(ji);
+ }
+ } catch (Exception e) {
+ throw new ContextException("Failure when starting a new
volatile job.", e);
+ }
+ }
+ public void beforeCompletion() { }
+ });
+ return null;
}
public void cancelJob(String jobId) throws ContextException {
@@ -311,9 +332,7 @@
}
@SuppressWarnings("unchecked")
- private void doExecute(JobExecutionContext jobcontext) throws
JobExecutionException {
- JobInfo ji = new JobInfo(jobcontext.getJobDetail().getName(),
jobcontext.getJobDetail().getJobDataMap(), jobcontext
- .getRefireCount());
+ private void doExecute(JobInfo ji) throws JobExecutionException {
JobProcessor processor = _processor;
if (processor == null)
throw new JobExecutionException("No processor.", null, true);
@@ -332,7 +351,9 @@
public static void execute(JobExecutionContext jobcontext) throws
JobExecutionException {
String schedulerGuid =
jobcontext.getJobDetail().getJobDataMap().getString("__scheduler");
- __instanceMap.get(schedulerGuid).doExecute(jobcontext);
+ JobInfo ji = new JobInfo(jobcontext.getJobDetail().getName(),
jobcontext.getJobDetail().getJobDataMap(),
+ jobcontext.getRefireCount());
+ __instanceMap.get(schedulerGuid).doExecute(ji);
}
/**
Modified:
incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/scheduler/TestScheduler.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/scheduler/TestScheduler.java?view=diff&rev=513477&r1=513476&r2=513477
==============================================================================
---
incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/scheduler/TestScheduler.java
(original)
+++
incubator/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/scheduler/TestScheduler.java
Thu Mar 1 12:13:40 2007
@@ -43,7 +43,7 @@
return null;
}
- public String scheduleVolatileJob(boolean arg0, Map<String, Object> arg1,
Date arg2) throws ContextException {
+ public String scheduleVolatileJob(boolean arg0, Map<String, Object> arg1)
throws ContextException {
return null;
}