Author: assaf
Date: Mon Sep 11 12:53:10 2006
New Revision: 442309

URL: http://svn.apache.org/viewvc?view=rev&rev=442309
Log:
Schedule wrapper, but still not getting test cases to complete

Modified:
    
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
    
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java

Modified: 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=442309&r1=442308&r2=442309
==============================================================================
--- 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 (original)
+++ 
incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java
 Mon Sep 11 12:53:10 2006
@@ -1202,7 +1202,6 @@
 
     public void registerActivityForRecovery(ActivityRecoveryChannel channel, 
long activityId, String reason,
                                             Date dateTime, Element data, 
String[] actions) {
-System.out.println("-- Registering activity for recovery: " + 
channel.export().toString());
       if (reason == null)
         reason = "Unspecified";
       if (dateTime == null)
@@ -1211,12 +1210,10 @@
     }
 
     public void unregisterActivityForRecovery(ActivityRecoveryChannel channel) 
{
-System.out.println("-- Unregistering activity for recovery: " + 
channel.export().toString());
       _dao.deleteActivityRecovery(channel.export());
     }
 
     public void recoverActivity(final String channel, final String action, 
final FaultData fault) {
-System.out.println("-- Recovery activity: " + channel + ": " + action);
       vpu.inject(new JacobRunnable() {
           private static final long serialVersionUID = 3168964409165899533L;
           public void run() {

Modified: 
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?view=diff&rev=442309&r1=442308&r2=442309
==============================================================================
--- 
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
 (original)
+++ 
incubator/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
 Mon Sep 11 12:53:10 2006
@@ -38,12 +38,14 @@
 import java.io.IOException;
 import java.io.File;
 import java.net.URI;
+import java.util.Collection;
+import java.util.Date;
 import java.util.Properties;
 import java.util.Hashtable;
 import java.util.HashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.Collection;
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.*;
 
 import javax.xml.namespace.QName;
 import javax.wsdl.PortType;
@@ -67,9 +69,9 @@
   Jotm                      _jotm;
   MinervaPool               _minervaPool;
   DataSource                _dataSource;
-  ExecutorService           _executorService;
-  QuartzSchedulerImpl       _scheduler;
+  Scheduler                 _scheduler;
   BpelDAOConnectionFactory  _daoCF;
+  int                       _scheduled;
   EndpointReferenceContext  _eprContext;
   MessageExchangeContext    _mexContext;
   BindingContext            _bindContext;
@@ -151,10 +153,20 @@
     return _server.getBpelManagementFacade();
   }
 
+  public void waitForBlocking() {
+    try {
+      // TODO: change this to lock on an object.
+      while (_scheduled > 0) {
+        Thread.sleep(5);
+        if (_scheduled == 0)
+          break;
+      }
+    } catch (InterruptedException except) { }
+  }
+
   public void shutdown() throws Exception {
     _server.stop();
     _scheduler.stop();
-    _scheduler.shutdown();
     _jotm.stop();
   }
 
@@ -197,13 +209,7 @@
       throw new RuntimeException("No transaction manager");
     if (_dataSource == null)
       throw new RuntimeException("No data source");
-    _executorService = Executors.newCachedThreadPool();
-    _scheduler = new QuartzSchedulerImpl();
-    _scheduler.setBpelServer(_server);
-    _scheduler.setExecutorService(_executorService, 20);
-    _scheduler.setTransactionManager(_txManager);
-    _scheduler.setDataSource(_dataSource);
-    _scheduler.init();
+    _scheduler = new SchedulerWrapper(_server, _txManager, _dataSource);
     return _scheduler;
   }
 
@@ -320,6 +326,59 @@
       }
       throw new RuntimeException("The reference class name \"" + 
ref.getClassName() + "\" is unknown.");
     }
+  }
+
+    QuartzSchedulerImpl _quartz;
+  private class SchedulerWrapper implements Scheduler {
+
+
+    SchedulerWrapper(BpelServer server, TransactionManager txManager, 
DataSource dataSource) {
+      ExecutorService executorService = new ExecutorServiceWrapper();
+      _quartz = new QuartzSchedulerImpl();
+      _quartz.setBpelServer(server);
+      _quartz.setExecutorService(executorService, 20);
+      _quartz.setTransactionManager(txManager);
+      _quartz.setDataSource(dataSource);
+      _quartz.init();
+    }
+
+    public String schedulePersistedJob(Map<String,Object>jobDetail,Date when) 
throws ContextException {
+      String jobId = _quartz.schedulePersistedJob(jobDetail, when);
+      ++_scheduled;
+      return jobId;
+    }
+  
+    public String scheduleVolatileJob(boolean transacted, Map<String,Object> 
jobDetail, Date when) throws ContextException {
+      String jobId = _quartz.scheduleVolatileJob(transacted, jobDetail, when);
+      ++_scheduled;
+      return jobId;
+    }
+  
+    public void cancelJob(String jobId) throws ContextException {
+      _quartz.cancelJob(jobId);
+      --_scheduled;
+    }
+  
+    public <T> T execTransaction(Callable<T> transaction) throws Exception, 
ContextException {
+      return _quartz.execTransaction(transaction);
+    }
+    public void start() { _quartz.start(); }
+    public void stop() { _quartz.stop(); }
+  }
+
+  private class ExecutorServiceWrapper extends AbstractExecutorService {
+    private ExecutorService _service = Executors.newCachedThreadPool();
+    public void execute(Runnable command) {
+      _service.execute(command);
+      --_scheduled;
+    }
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+      return _service.awaitTermination(timeout, unit);
+    }
+    public void shutdown() { _service.shutdown(); }
+    public List<Runnable> shutdownNow() { return _service.shutdownNow(); }
+    public boolean isShutdown() { return _service.isShutdown(); }
+    public boolean isTerminated() { return _service.isTerminated(); }
   }
 
 }


Reply via email to