Author: gnodet
Date: Fri Nov 24 01:42:46 2006
New Revision: 478816

URL: http://svn.apache.org/viewvc?view=rev&rev=478816
Log:
SM-751: Flow tracing with correlation id.
Patch provided by Gianfranco Boccalon

Modified:
    
incubator/servicemix/trunk/common/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
    
incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/JbiConstants.java

Modified: 
incubator/servicemix/trunk/common/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/common/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java?view=diff&rev=478816&r1=478815&r2=478816
==============================================================================
--- 
incubator/servicemix/trunk/common/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
 (original)
+++ 
incubator/servicemix/trunk/common/servicemix-common/src/main/java/org/apache/servicemix/common/AsyncBaseLifeCycle.java
 Fri Nov 24 01:42:46 2006
@@ -78,12 +78,15 @@
 
     protected boolean workManagerCreated;
 
-    protected Map processors = new ConcurrentHashMap();
+    protected Map processors;
+    
+    protected ThreadLocal correlationId;
 
     public AsyncBaseLifeCycle() {
         this.running = new AtomicBoolean(false);
         this.polling = new AtomicBoolean(false);
         this.processors = new ConcurrentHashMap();
+        this.correlationId = new ThreadLocal();
     }
 
     public AsyncBaseLifeCycle(ServiceMixComponent component) {
@@ -475,12 +478,21 @@
                 ClassLoader classLoader = 
ep.getServiceUnit().getConfigurationClassLoader();
                 Thread.currentThread().setContextClassLoader(classLoader);
             }
-
+            // Read the correlation id from the exchange and set it in the 
correlation id property
+            String correlationID = 
(String)exchange.getProperty(JbiConstants.CORRELATION_ID);
+            if (correlationID != null) {
+                // Set the id in threadlocal variable
+                correlationId.set(correlationID);
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("Retrieved correlation id: " + correlationID);
+            }
             processor.process(exchange);
         } finally {
             Thread.currentThread().setContextClassLoader(oldCl);
+            // Clean the threadlocal variable
+            correlationId.set(null);
         }
-
     }
 
     /**
@@ -507,8 +519,31 @@
      * @throws MessagingException
      */
     public void sendConsumerExchange(MessageExchange exchange, Endpoint 
endpoint) throws MessagingException {
+        // Check if a correlation id is already set on the exchange, otherwise 
create it
+        String correlationIDValue = (String) 
exchange.getProperty(JbiConstants.CORRELATION_ID);
+        if (correlationIDValue == null) {
+            // Retrieve correlation id from thread local variable, if exist
+            correlationIDValue = (String) correlationId.get();
+            if (correlationIDValue == null) {
+                // Set a correlation id property that have to be propagated in 
all components
+                // to trace the process instance
+                correlationIDValue = exchange.getExchangeId();
+                exchange.setProperty(JbiConstants.CORRELATION_ID, 
exchange.getExchangeId());
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Created correlation id: " + 
correlationIDValue);
+                }
+            } else {
+                // Use correlation id retrieved from previous message exchange
+                exchange.setProperty(JbiConstants.CORRELATION_ID, 
correlationIDValue);
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Correlation id retrieved from ThreadLocal: " 
+ correlationIDValue);
+                }
+            }
+        }
+        // Set the sender endpoint property
         String key = EndpointSupport.getKey(endpoint);
         exchange.setProperty(JbiConstants.SENDER_ENDPOINT, key);
+        // Send the exchange
         channel.send(exchange);
     }
 

Modified: 
incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/JbiConstants.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/JbiConstants.java?view=diff&rev=478816&r1=478815&r2=478816
==============================================================================
--- 
incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/JbiConstants.java
 (original)
+++ 
incubator/servicemix/trunk/core/servicemix-core/src/main/java/org/apache/servicemix/JbiConstants.java
 Fri Nov 24 01:42:46 2006
@@ -43,4 +43,12 @@
 
     String HTTP_DESTINATION_URI = "org.apache.servicemix.http.destination.uri";
     
+    /**
+     * This property should be set when a consumer endpoint creates an exchange
+     * related to another provider exchange.  The value of the property should
+     * be set to the value of this property in the provider exchange,
+     * or to the id of the provider exchange if the property does not exist.
+     */
+    String CORRELATION_ID = "org.apache.servicemix.correlationId";
+    
 }


Reply via email to