Author: fhanik
Date: Tue Mar  7 12:24:06 2006
New Revision: 383997

URL: http://svn.apache.org/viewcvs?rev=383997&view=rev
Log:
Most of the impl is complete

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=383997&r1=383996&r2=383997&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java
 Tue Mar  7 12:24:06 2006
@@ -15,17 +15,26 @@
  */
 package org.apache.catalina.tribes.tipis;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
+import org.apache.catalina.tribes.ChannelListener;
 import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.util.UUIDGenerator;
 
 /**
  * A channel to handle RPC messaging
  * @author Filip Hanik
  */
-public class RpcChannel {
+public class RpcChannel implements ChannelListener{
     
     public static final int FIRST_REPLY = 1;
     public static final int MAJORITY_REPLY = 2;
@@ -33,7 +42,9 @@
     
     private Channel channel;
     private RpcCallback callback;
-    private String rpcId;
+    private byte[] rpcId;
+    
+    private HashMap responseMap = new HashMap();
 
     /**
      * Create an RPC channel. You can have several RPC channels attached to a 
group
@@ -42,10 +53,11 @@
      * @param channel Channel
      * @param callback RpcCallback
      */
-    public RpcChannel(String rpcId, Channel channel, RpcCallback callback) {
+    public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
         this.channel = channel;
         this.callback = callback;
         this.rpcId = rpcId;
+        //channel.addChannelListener(this);
     }
     
     
@@ -54,15 +66,47 @@
      * @param destination Member[] - the destination for the message, and the 
members you request a reply from
      * @param message Serializable - the message you are sending out
      * @param options int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY
-     * @param timeout long - timeout in milliseconds, if no reply is received 
within this time an exception is thrown
+     * @param timeout long - timeout in milliseconds, if no reply is received 
within this time null is returned
      * @return Response[] - an array of response objects.
      * @throws ChannelException
      */
     public Response[] send(Member[] destination, 
                            Serializable message,
                            int options, 
-                           long timeout) throws ChannelException {
-        throw new UnsupportedOperationException();
+                           long timeout) throws ChannelException, 
InterruptedException {
+        
+        RpcCollectorKey key = new 
RpcCollectorKey(UUIDGenerator.randomUUID(false));
+        RpcCollector collector = new 
RpcCollector(key,options,destination.length,timeout);
+        synchronized (collector) {
+            responseMap.put(key,collector);
+            RpcMessage rmsg = new RpcMessage(rpcId,key.id,message);
+            channel.send(destination,rmsg);
+            collector.wait(timeout);
+        }
+        return collector.getResponses();
+    }
+    
+    
+    public void messageReceived(Serializable msg, Member sender) {
+        RpcMessage rmsg = (RpcMessage)msg;
+        RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid);
+        RpcCollector collector = (RpcCollector)responseMap.get(key);
+        if ( collector == null ) {
+            callback.leftOver(rmsg.message,sender);
+        } else {
+            synchronized (collector) {
+                collector.addResponse(rmsg.message,sender);
+                if ( collector.isComplete() ) collector.notifyAll();
+            }
+        }
+        
+    }
+    
+    public boolean accept(Serializable msg, Member sender) {
+        if ( msg instanceof RpcMessage ) {
+            RpcMessage rmsg = (RpcMessage)msg;
+            return Arrays.equals(rmsg.rpcId,rpcId);
+        }else return false;
     }
     
     public Channel getChannel() {
@@ -73,7 +117,7 @@
         return callback;
     }
 
-    public String getRpcId() {
+    public byte[] getRpcId() {
         return rpcId;
     }
 
@@ -85,8 +129,116 @@
         this.callback = callback;
     }
 
-    public void setRpcId(String rpcId) {
+    public void setRpcId(byte[] rpcId) {
         this.rpcId = rpcId;
+    }
+    
+    public static class RpcMessage implements Externalizable {
+        
+        private Serializable message;
+        private byte[] uuid;
+        private byte[] rpcId;
+        
+        public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
+            this.rpcId = rpcId;
+            this.uuid = uuid;
+            this.message = message;
+        }
+        
+        public void readExternal(ObjectInput in) throws 
IOException,ClassNotFoundException {
+            int length = in.readInt();
+            uuid = new byte[length];
+            in.read(uuid, 0, length);
+            length = in.readInt();
+            rpcId = new byte[length];
+            in.read(rpcId, 0, length);
+            message = (Serializable)in.readObject();
+        }
+    
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeInt(uuid.length);
+            out.write(uuid, 0, uuid.length);
+            out.writeInt(rpcId.length);
+            out.write(rpcId, 0, rpcId.length);
+            out.writeObject(message);
+        }
+
+    }
+    
+    /**
+     * 
+     * Class that holds all response.
+     * @author not attributable
+     * @version 1.0
+     */
+    public static class RpcCollector {
+        public ArrayList responses = new ArrayList(); 
+        public RpcCollectorKey key;
+        public int options;
+        public int destcnt;
+        public long timeout;
+        
+        public RpcCollector(RpcCollectorKey key, int options, int destcnt, 
long timeout) {
+            this.key = key;
+            this.options = options;
+            this.destcnt = destcnt;
+            this.timeout = timeout;
+        }
+        
+        public void addResponse(Serializable message, Member sender){
+            Response resp = new Response(sender,message);
+            responses.add(resp);
+        }
+        
+        public boolean isComplete() {
+            switch (options) {
+                case ALL_REPLY:
+                    return destcnt == responses.size();
+                case MAJORITY_REPLY:
+                {
+                    float perc = ((float)responses.size()) / ((float)destcnt);
+                    return perc >= 50f;
+                }
+                case FIRST_REPLY:
+                    return responses.size()>0;
+                default:
+                    return false;
+            }
+        }
+        
+        public int hashCode() {
+            return key.hashCode();
+        }
+        
+        public boolean equals(Object o) {
+            if ( o instanceof RpcCollector ) {
+                RpcCollector r = (RpcCollector)o;
+                return r.key.equals(this.key);
+            } else return false;
+        }
+        
+        public Response[] getResponses() {
+            return (Response[])responses.toArray(new 
Response[responses.size()]);
+        }
+    }
+    
+    public static class RpcCollectorKey {
+        byte[] id;
+        public RpcCollectorKey(byte[] id) {
+            this.id = id;
+        }
+        
+        public int hashCode() {
+            return id[0]+id[1]+id[2]+id[3];
+        }
+
+        public boolean equals(Object o) {
+            if ( o instanceof RpcCollectorKey ) {
+                RpcCollectorKey r = (RpcCollectorKey)o;
+                return Arrays.equals(id,r.id);
+            } else return false;
+        }
+        
     }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to