Author: gnodet
Date: Sun Oct 15 12:12:52 2006
New Revision: 464253

URL: http://svn.apache.org/viewvc?view=rev&rev=464253
Log:
Enhance EndpointComponentContext.java and EndpointDeliveryChannel.java to allow 
asynchronous consumer exchanges

Modified:
    
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointComponentContext.java
    
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointDeliveryChannel.java
    
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/PollingEndpoint.java
    
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ProviderEndpoint.java

Modified: 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointComponentContext.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointComponentContext.java?view=diff&rev=464253&r1=464252&r2=464253
==============================================================================
--- 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointComponentContext.java
 (original)
+++ 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointComponentContext.java
 Sun Oct 15 12:12:52 2006
@@ -34,11 +34,13 @@
 
 public class EndpointComponentContext implements ComponentContext {
 
+    private Endpoint endpoint;
     private ComponentContext context;
     private DeliveryChannel channel;
     
-    public EndpointComponentContext(ComponentContext context) {
-        this.context = context;
+    public EndpointComponentContext(Endpoint endpoint) {
+        this.endpoint = endpoint;
+        this.context = 
endpoint.getServiceUnit().getComponent().getComponentContext();
     }
 
     public ServiceEndpoint activateEndpoint(QName serviceName, String 
endpointName) throws JBIException {
@@ -59,7 +61,7 @@
 
     public DeliveryChannel getDeliveryChannel() throws MessagingException {
         if (this.channel == null) {
-            this.channel = new 
EndpointDeliveryChannel(context.getDeliveryChannel());
+            this.channel = new EndpointDeliveryChannel(endpoint);
         }
         return this.channel;
     }

Modified: 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointDeliveryChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointDeliveryChannel.java?view=diff&rev=464253&r1=464252&r2=464253
==============================================================================
--- 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointDeliveryChannel.java
 (original)
+++ 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointDeliveryChannel.java
 Sun Oct 15 12:12:52 2006
@@ -21,6 +21,7 @@
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessageExchangeFactory;
 import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
 import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.xml.namespace.QName;
 
@@ -35,9 +36,11 @@
 public class EndpointDeliveryChannel implements DeliveryChannel {
 
     private final DeliveryChannel channel;
+    private final Endpoint endpoint;
     
-    public EndpointDeliveryChannel(DeliveryChannel channel) {
-        this.channel = channel;
+    public EndpointDeliveryChannel(Endpoint endpoint) throws 
MessagingException {
+        this.endpoint = endpoint;
+        this.channel = 
endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
     }
 
     public MessageExchange accept() throws MessagingException {
@@ -69,10 +72,12 @@
     }
 
     public void send(MessageExchange exchange) throws MessagingException {
-        if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-            throw new UnsupportedOperationException("Asynchronous send of 
active exchanges are not supported");
+        if (exchange.getStatus() == ExchangeStatus.ACTIVE && 
exchange.getRole() == Role.CONSUMER) {
+            ServiceMixComponent comp = 
endpoint.getServiceUnit().getComponent();
+            comp.sendConsumerExchange(exchange, endpoint);
+        } else {
+            channel.send(exchange);
         }
-        channel.send(exchange);
     }
 
     public boolean sendSync(MessageExchange exchange, long timeout) throws 
MessagingException {

Modified: 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/PollingEndpoint.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/PollingEndpoint.java?view=diff&rev=464253&r1=464252&r2=464253
==============================================================================
--- 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/PollingEndpoint.java
 (original)
+++ 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/PollingEndpoint.java
 Sun Oct 15 12:12:52 2006
@@ -17,6 +17,12 @@
  */
 package org.apache.servicemix.common;
 
+import java.util.Date;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.MessagingException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.components.varscheduler.ScheduleIterator;
@@ -24,13 +30,6 @@
 import org.apache.servicemix.components.varscheduler.SchedulerTask;
 import org.apache.servicemix.executors.Executor;
 
-import javax.jbi.JBIException;
-import javax.jbi.messaging.DeliveryChannel;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.management.DeploymentException;
-import javax.resource.spi.work.Work;
-import java.util.Date;
-
 /**
  * An implementation inheritence class for an endpoint which polls some 
resource at periodic intervals to decide if
  * there is an event to process.
@@ -150,10 +149,7 @@
             try {
                 // lets run the work inside the JCA worker pools to ensure
                 // the threads are setup correctly when we actually do stuff
-                getExecutor().execute(new Work() {
-                    public void release() {
-                    }
-
+                getExecutor().execute(new Runnable() {
                     public void run() {
                         try {
                             poll();

Modified: 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ProviderEndpoint.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ProviderEndpoint.java?view=diff&rev=464253&r1=464252&r2=464253
==============================================================================
--- 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ProviderEndpoint.java
 (original)
+++ 
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ProviderEndpoint.java
 Sun Oct 15 12:12:52 2006
@@ -34,6 +34,7 @@
     private ServiceEndpoint activated;
     private DeliveryChannel channel;
     private MessageExchangeFactory exchangeFactory;
+    private ComponentContext context;
 
 
     public ProviderEndpoint() {
@@ -57,7 +58,8 @@
 
     public void activate() throws Exception {
         ComponentContext ctx = 
getServiceUnit().getComponent().getComponentContext();
-        channel = ctx.getDeliveryChannel();
+        context = new EndpointComponentContext(this);
+        channel = context.getDeliveryChannel();
         exchangeFactory = channel.createExchangeFactory();
         activated = ctx.activateEndpoint(service, endpoint);
         start();


Reply via email to