Author: assaf
Date: Mon Sep 11 12:53:10 2006
New Revision: 442309
URL: http://svn.apache.org/viewvc?view=rev&rev=442309
Log:
Schedule wrapper, but still not getting test cases to complete
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
Modified:
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=442309&r1=442308&r2=442309
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
Mon Sep 11 12:53:10 2006
@@ -1202,7 +1202,6 @@
public void registerActivityForRecovery(ActivityRecoveryChannel channel,
long activityId, String reason,
Date dateTime, Element data,
String[] actions) {
-System.out.println("-- Registering activity for recovery: " +
channel.export().toString());
if (reason == null)
reason = "Unspecified";
if (dateTime == null)
@@ -1211,12 +1210,10 @@
}
public void unregisterActivityForRecovery(ActivityRecoveryChannel channel)
{
-System.out.println("-- Unregistering activity for recovery: " +
channel.export().toString());
_dao.deleteActivityRecovery(channel.export());
}
public void recoverActivity(final String channel, final String action,
final FaultData fault) {
-System.out.println("-- Recovery activity: " + channel + ": " + action);
vpu.inject(new JacobRunnable() {
private static final long serialVersionUID = 3168964409165899533L;
public void run() {
Modified:
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?view=diff&rev=442309&r1=442308&r2=442309
==============================================================================
---
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
(original)
+++
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
Mon Sep 11 12:53:10 2006
@@ -38,12 +38,14 @@
import java.io.IOException;
import java.io.File;
import java.net.URI;
+import java.util.Collection;
+import java.util.Date;
import java.util.Properties;
import java.util.Hashtable;
import java.util.HashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.Collection;
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.*;
import javax.xml.namespace.QName;
import javax.wsdl.PortType;
@@ -67,9 +69,9 @@
Jotm _jotm;
MinervaPool _minervaPool;
DataSource _dataSource;
- ExecutorService _executorService;
- QuartzSchedulerImpl _scheduler;
+ Scheduler _scheduler;
BpelDAOConnectionFactory _daoCF;
+ int _scheduled;
EndpointReferenceContext _eprContext;
MessageExchangeContext _mexContext;
BindingContext _bindContext;
@@ -151,10 +153,20 @@
return _server.getBpelManagementFacade();
}
+ public void waitForBlocking() {
+ try {
+ // TODO: change this to lock on an object.
+ while (_scheduled > 0) {
+ Thread.sleep(5);
+ if (_scheduled == 0)
+ break;
+ }
+ } catch (InterruptedException except) { }
+ }
+
public void shutdown() throws Exception {
_server.stop();
_scheduler.stop();
- _scheduler.shutdown();
_jotm.stop();
}
@@ -197,13 +209,7 @@
throw new RuntimeException("No transaction manager");
if (_dataSource == null)
throw new RuntimeException("No data source");
- _executorService = Executors.newCachedThreadPool();
- _scheduler = new QuartzSchedulerImpl();
- _scheduler.setBpelServer(_server);
- _scheduler.setExecutorService(_executorService, 20);
- _scheduler.setTransactionManager(_txManager);
- _scheduler.setDataSource(_dataSource);
- _scheduler.init();
+ _scheduler = new SchedulerWrapper(_server, _txManager, _dataSource);
return _scheduler;
}
@@ -320,6 +326,59 @@
}
throw new RuntimeException("The reference class name \"" +
ref.getClassName() + "\" is unknown.");
}
+ }
+
+ QuartzSchedulerImpl _quartz;
+ private class SchedulerWrapper implements Scheduler {
+
+
+ SchedulerWrapper(BpelServer server, TransactionManager txManager,
DataSource dataSource) {
+ ExecutorService executorService = new ExecutorServiceWrapper();
+ _quartz = new QuartzSchedulerImpl();
+ _quartz.setBpelServer(server);
+ _quartz.setExecutorService(executorService, 20);
+ _quartz.setTransactionManager(txManager);
+ _quartz.setDataSource(dataSource);
+ _quartz.init();
+ }
+
+ public String schedulePersistedJob(Map<String,Object>jobDetail,Date when)
throws ContextException {
+ String jobId = _quartz.schedulePersistedJob(jobDetail, when);
+ ++_scheduled;
+ return jobId;
+ }
+
+ public String scheduleVolatileJob(boolean transacted, Map<String,Object>
jobDetail, Date when) throws ContextException {
+ String jobId = _quartz.scheduleVolatileJob(transacted, jobDetail, when);
+ ++_scheduled;
+ return jobId;
+ }
+
+ public void cancelJob(String jobId) throws ContextException {
+ _quartz.cancelJob(jobId);
+ --_scheduled;
+ }
+
+ public <T> T execTransaction(Callable<T> transaction) throws Exception,
ContextException {
+ return _quartz.execTransaction(transaction);
+ }
+ public void start() { _quartz.start(); }
+ public void stop() { _quartz.stop(); }
+ }
+
+ private class ExecutorServiceWrapper extends AbstractExecutorService {
+ private ExecutorService _service = Executors.newCachedThreadPool();
+ public void execute(Runnable command) {
+ _service.execute(command);
+ --_scheduled;
+ }
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ return _service.awaitTermination(timeout, unit);
+ }
+ public void shutdown() { _service.shutdown(); }
+ public List<Runnable> shutdownNow() { return _service.shutdownNow(); }
+ public boolean isShutdown() { return _service.isShutdown(); }
+ public boolean isTerminated() { return _service.isTerminated(); }
}
}