Author: scamp
Date: Wed Feb  2 10:17:07 2005
New Revision: 149542

URL: http://svn.apache.org/viewcvs?view=rev&rev=149542
Log:
updated...for emitter

Modified:
    
incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
    
incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java

Modified: 
incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java?view=diff&r1=149541&r2=149542
==============================================================================
--- 
incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
 (original)
+++ 
incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
 Wed Feb  2 10:17:07 2005
@@ -311,7 +311,7 @@
 
     public NotificationConsumer getNotificationConsumer()
     {
-        return null;  //To change body of implemented methods use File | 
Settings | File Templates.
+        return null;
     }
 
     public NotificationProducer getNotificationProducer()

Modified: 
incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
URL: 
http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java?view=diff&r1=149541&r2=149542
==============================================================================
--- 
incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
 (original)
+++ 
incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
 Wed Feb  2 10:17:07 2005
@@ -21,11 +21,32 @@
 import org.apache.ws.notification.base.Subscription;
 import org.apache.ws.notification.topics.Topic;
 import org.apache.ws.notification.topics.TopicListener;
+import org.apache.ws.notification.topics.TopicExpressionEngine;
+import org.apache.ws.notification.topics.TopicExpression;
+import org.apache.ws.notification.topics.TopicExpressionEvaluator;
+import 
org.apache.ws.notification.topics.topicexpression.impl.TopicExpressionException;
 import org.apache.ws.pubsub.NotificationConsumer;
+import org.apache.ws.pubsub.emitter.EmitterTask;
+import org.apache.ws.util.thread.NamedThread;
+import org.apache.ws.XmlObjectWrapper;
+import org.apache.axis.message.addressing.EndpointReference;
+import org.apache.xmlbeans.XmlObject;
 import org.xmlsoap.schemas.ws.x2003.x03.addressing.EndpointReferenceType;
+import 
org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotifyDocument;
+import 
org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotificationMessageHolderType;
+import 
org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
+import org.w3c.dom.Document;
 
 import java.io.Serializable;
 import java.util.List;
+import java.net.URL;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+import javax.xml.soap.SOAPMessage;
+import javax.xml.soap.MessageFactory;
+import javax.xml.soap.SOAPHeader;
+import javax.xml.soap.SOAPBody;
 
 /**
  * Topic listener implementation that will trigger notifications when a topic
@@ -44,6 +65,17 @@
 
     private Subscription m_subscription;
 
+     // the thread pool used to emit notifications
+   private static final PooledExecutor EMITTER_POOL;
+
+    static
+   {
+      EMITTER_POOL = new PooledExecutor( 100 );
+
+      // make sure the threads are non-daemon threads so they have time to 
complete even if the JVM wants to shut down
+      EMITTER_POOL.setThreadFactory( new NamedThread.ConcurrentThreadFactory( 
"notifmgr-emitter", false ) );
+   }
+
     /**
      * Construct a listener instance.
      *
@@ -112,20 +144,27 @@
             {
                 System.out.println("Notification received for subscription 
with Id " + subscription.getID() + "; value: " + msg);
 
-              /*  EndpointReferenceType consumerEPR =
-                    subscription.getNotificationConsumer().getEPR();
+                EndpointReference epr = 
subscription.getNotificationConsumer().getEPR();
+
                 if(subscription.getUseNotify())
                 {
-                    setPort(consumerEPR);
-                    NotifyDocument.Notify notification 
=NotifyDocument.Notify.Factory.newInstance();
-                    EndpointReferenceType producerEndpoint =
-                        subscription.getNotificationProducer().getEPR();
+                    NotifyDocument notifyDoc = 
NotifyDocument.Factory.newInstance();
+                    NotifyDocument.Notify notify = notifyDoc.addNewNotify();
+                    NotificationMessageHolderType 
notificationMessageHolderType = notify.addNewNotificationMessage();
+
+                    //assumes xmlobject for msg...this needs to change
+                    notificationMessageHolderType.setMessage((XmlObject) msg);
+
+                    EndpointReferenceType endpointReferenceType = 
notificationMessageHolderType.addNewProducerReference();
+                    //todo once epr is resolved
+                    
//notificationMessageHolderType.setProducerReference(subscription.getNotificationProducer());
+
                     TopicExpressionEngine engine =
                         TopicExpressionEngineImpl.getInstance();
                     TopicExpression topicExpressionIntf = 
subscription.getTopicExpression();
 
                     TopicExpressionType tp = 
TopicExpressionType.Factory.newInstance();
-                    //todo
+                    //todo ???
 
 
                     String dialect = 
topicExpressionIntf.getDialect().toString();
@@ -143,55 +182,29 @@
 
 
                     }
-                    NotificationMessageHolderType[] message =
-                        {NotificationMessageHolderType.Factory.newInstance()};
-                    message[0].setProducerReference(producerEndpoint);
-                    message[0].setMessage(newValue);
-                    message[0].setTopic(topicExpression);
-                    notification.setNotificationMessage(message);
-                    this.consumerPort.notify(notification);
+                    
notificationMessageHolderType.setTopic((TopicExpressionType) 
((XmlObjectWrapper)topicExpression).getXmlObject());
+                    //notify is done
+
 
+                    //build soap mesage
+                    SOAPMessage soapMsg = 
MessageFactory.newInstance().createMessage();
+
+                    SOAPHeader soapHeader = soapMsg.getSOAPHeader();
+                    //set the wsa headers
+
+                    SOAPBody soapBody = soapMsg.getSOAPBody();
+                    //doubt this will work...but lets give it a try using a 
Document type "notifyDoc"
+                    soapBody.addDocument((Document) notifyDoc.newDomNode());
+
+                    
EMITTER_POOL.execute(EmitterTask.createEmitterTask(soapMsg, new 
URL(epr.getAddress().toString())));
 
                 }
                 else
                 {
-                    // TODO: raw notifications
-                }      */
+                    // TODO: raw notifications ///could we build the same as 
above and simply strip off the notify?? not sure
+                }
             }
         }
     }
 
-    //TODO: revisit this from perf angle, don't thing we need to regenerate
-    // the stub quite as aggressively
-    private void setPort(
-
-        EndpointReferenceType consumerEPR)
-        throws Exception
-    {
-            setPort(true, consumerEPR);
-    }
-
-    private synchronized void setPort(
-        boolean reuse,
-        EndpointReferenceType consumerEPR)
-        throws Exception
-    {
-
-        logger.debug("set port with " + reuse);
-        if((reuse) && (this.consumerPort != null))
-        {
-            return;
-        }
-                         //todo decide what to do here...
-       /* if(this.locator == null)
-        {
-            this.locator = new WSBaseNotificationServiceAddressingLocator();
-        }
-
-        if((this.consumerPort == null) || (!reuse))
-        {
-            this.consumerPort =
-                this.locator.getNotificationConsumerPort(consumerEPR);
-        }*/
-    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to