Author: gnodet
Date: Sun Oct 15 12:12:52 2006
New Revision: 464253
URL: http://svn.apache.org/viewvc?view=rev&rev=464253
Log:
Enhance EndpointComponentContext.java and EndpointDeliveryChannel.java to allow
asynchronous consumer exchanges
Modified:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointComponentContext.java
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointDeliveryChannel.java
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/PollingEndpoint.java
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ProviderEndpoint.java
Modified:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointComponentContext.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointComponentContext.java?view=diff&rev=464253&r1=464252&r2=464253
==============================================================================
---
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointComponentContext.java
(original)
+++
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointComponentContext.java
Sun Oct 15 12:12:52 2006
@@ -34,11 +34,13 @@
public class EndpointComponentContext implements ComponentContext {
+ private Endpoint endpoint;
private ComponentContext context;
private DeliveryChannel channel;
- public EndpointComponentContext(ComponentContext context) {
- this.context = context;
+ public EndpointComponentContext(Endpoint endpoint) {
+ this.endpoint = endpoint;
+ this.context =
endpoint.getServiceUnit().getComponent().getComponentContext();
}
public ServiceEndpoint activateEndpoint(QName serviceName, String
endpointName) throws JBIException {
@@ -59,7 +61,7 @@
public DeliveryChannel getDeliveryChannel() throws MessagingException {
if (this.channel == null) {
- this.channel = new
EndpointDeliveryChannel(context.getDeliveryChannel());
+ this.channel = new EndpointDeliveryChannel(endpoint);
}
return this.channel;
}
Modified:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointDeliveryChannel.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointDeliveryChannel.java?view=diff&rev=464253&r1=464252&r2=464253
==============================================================================
---
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointDeliveryChannel.java
(original)
+++
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/EndpointDeliveryChannel.java
Sun Oct 15 12:12:52 2006
@@ -21,6 +21,7 @@
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.xml.namespace.QName;
@@ -35,9 +36,11 @@
public class EndpointDeliveryChannel implements DeliveryChannel {
private final DeliveryChannel channel;
+ private final Endpoint endpoint;
- public EndpointDeliveryChannel(DeliveryChannel channel) {
- this.channel = channel;
+ public EndpointDeliveryChannel(Endpoint endpoint) throws
MessagingException {
+ this.endpoint = endpoint;
+ this.channel =
endpoint.getServiceUnit().getComponent().getComponentContext().getDeliveryChannel();
}
public MessageExchange accept() throws MessagingException {
@@ -69,10 +72,12 @@
}
public void send(MessageExchange exchange) throws MessagingException {
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- throw new UnsupportedOperationException("Asynchronous send of
active exchanges are not supported");
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE &&
exchange.getRole() == Role.CONSUMER) {
+ ServiceMixComponent comp =
endpoint.getServiceUnit().getComponent();
+ comp.sendConsumerExchange(exchange, endpoint);
+ } else {
+ channel.send(exchange);
}
- channel.send(exchange);
}
public boolean sendSync(MessageExchange exchange, long timeout) throws
MessagingException {
Modified:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/PollingEndpoint.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/PollingEndpoint.java?view=diff&rev=464253&r1=464252&r2=464253
==============================================================================
---
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/PollingEndpoint.java
(original)
+++
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/PollingEndpoint.java
Sun Oct 15 12:12:52 2006
@@ -17,6 +17,12 @@
*/
package org.apache.servicemix.common;
+import java.util.Date;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.DeliveryChannel;
+import javax.jbi.messaging.MessagingException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.components.varscheduler.ScheduleIterator;
@@ -24,13 +30,6 @@
import org.apache.servicemix.components.varscheduler.SchedulerTask;
import org.apache.servicemix.executors.Executor;
-import javax.jbi.JBIException;
-import javax.jbi.messaging.DeliveryChannel;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.management.DeploymentException;
-import javax.resource.spi.work.Work;
-import java.util.Date;
-
/**
* An implementation inheritence class for an endpoint which polls some
resource at periodic intervals to decide if
* there is an event to process.
@@ -150,10 +149,7 @@
try {
// lets run the work inside the JCA worker pools to ensure
// the threads are setup correctly when we actually do stuff
- getExecutor().execute(new Work() {
- public void release() {
- }
-
+ getExecutor().execute(new Runnable() {
public void run() {
try {
poll();
Modified:
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ProviderEndpoint.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ProviderEndpoint.java?view=diff&rev=464253&r1=464252&r2=464253
==============================================================================
---
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ProviderEndpoint.java
(original)
+++
incubator/servicemix/trunk/servicemix-common/src/main/java/org/apache/servicemix/common/ProviderEndpoint.java
Sun Oct 15 12:12:52 2006
@@ -34,6 +34,7 @@
private ServiceEndpoint activated;
private DeliveryChannel channel;
private MessageExchangeFactory exchangeFactory;
+ private ComponentContext context;
public ProviderEndpoint() {
@@ -57,7 +58,8 @@
public void activate() throws Exception {
ComponentContext ctx =
getServiceUnit().getComponent().getComponentContext();
- channel = ctx.getDeliveryChannel();
+ context = new EndpointComponentContext(this);
+ channel = context.getDeliveryChannel();
exchangeFactory = channel.createExchangeFactory();
activated = ctx.activateEndpoint(service, endpoint);
start();