Author: fhanik Date: Tue Mar 7 12:24:06 2006 New Revision: 383997 URL: http://svn.apache.org/viewcvs?rev=383997&view=rev Log: Most of the impl is complete
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java?rev=383997&r1=383996&r2=383997&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tipis/RpcChannel.java Tue Mar 7 12:24:06 2006 @@ -15,17 +15,26 @@ */ package org.apache.catalina.tribes.tipis; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import org.apache.catalina.tribes.Channel; import org.apache.catalina.tribes.ChannelException; +import org.apache.catalina.tribes.ChannelListener; import org.apache.catalina.tribes.Member; +import org.apache.catalina.tribes.util.UUIDGenerator; /** * A channel to handle RPC messaging * @author Filip Hanik */ -public class RpcChannel { +public class RpcChannel implements ChannelListener{ public static final int FIRST_REPLY = 1; public static final int MAJORITY_REPLY = 2; @@ -33,7 +42,9 @@ private Channel channel; private RpcCallback callback; - private String rpcId; + private byte[] rpcId; + + private HashMap responseMap = new HashMap(); /** * Create an RPC channel. You can have several RPC channels attached to a group @@ -42,10 +53,11 @@ * @param channel Channel * @param callback RpcCallback */ - public RpcChannel(String rpcId, Channel channel, RpcCallback callback) { + public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) { this.channel = channel; this.callback = callback; this.rpcId = rpcId; + //channel.addChannelListener(this); } @@ -54,15 +66,47 @@ * @param destination Member[] - the destination for the message, and the members you request a reply from * @param message Serializable - the message you are sending out * @param options int - FIRST_REPLY, MAJORITY_REPLY or ALL_REPLY - * @param timeout long - timeout in milliseconds, if no reply is received within this time an exception is thrown + * @param timeout long - timeout in milliseconds, if no reply is received within this time null is returned * @return Response[] - an array of response objects. * @throws ChannelException */ public Response[] send(Member[] destination, Serializable message, int options, - long timeout) throws ChannelException { - throw new UnsupportedOperationException(); + long timeout) throws ChannelException, InterruptedException { + + RpcCollectorKey key = new RpcCollectorKey(UUIDGenerator.randomUUID(false)); + RpcCollector collector = new RpcCollector(key,options,destination.length,timeout); + synchronized (collector) { + responseMap.put(key,collector); + RpcMessage rmsg = new RpcMessage(rpcId,key.id,message); + channel.send(destination,rmsg); + collector.wait(timeout); + } + return collector.getResponses(); + } + + + public void messageReceived(Serializable msg, Member sender) { + RpcMessage rmsg = (RpcMessage)msg; + RpcCollectorKey key = new RpcCollectorKey(rmsg.uuid); + RpcCollector collector = (RpcCollector)responseMap.get(key); + if ( collector == null ) { + callback.leftOver(rmsg.message,sender); + } else { + synchronized (collector) { + collector.addResponse(rmsg.message,sender); + if ( collector.isComplete() ) collector.notifyAll(); + } + } + + } + + public boolean accept(Serializable msg, Member sender) { + if ( msg instanceof RpcMessage ) { + RpcMessage rmsg = (RpcMessage)msg; + return Arrays.equals(rmsg.rpcId,rpcId); + }else return false; } public Channel getChannel() { @@ -73,7 +117,7 @@ return callback; } - public String getRpcId() { + public byte[] getRpcId() { return rpcId; } @@ -85,8 +129,116 @@ this.callback = callback; } - public void setRpcId(String rpcId) { + public void setRpcId(byte[] rpcId) { this.rpcId = rpcId; + } + + public static class RpcMessage implements Externalizable { + + private Serializable message; + private byte[] uuid; + private byte[] rpcId; + + public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) { + this.rpcId = rpcId; + this.uuid = uuid; + this.message = message; + } + + public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException { + int length = in.readInt(); + uuid = new byte[length]; + in.read(uuid, 0, length); + length = in.readInt(); + rpcId = new byte[length]; + in.read(rpcId, 0, length); + message = (Serializable)in.readObject(); + } + + public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(uuid.length); + out.write(uuid, 0, uuid.length); + out.writeInt(rpcId.length); + out.write(rpcId, 0, rpcId.length); + out.writeObject(message); + } + + } + + /** + * + * Class that holds all response. + * @author not attributable + * @version 1.0 + */ + public static class RpcCollector { + public ArrayList responses = new ArrayList(); + public RpcCollectorKey key; + public int options; + public int destcnt; + public long timeout; + + public RpcCollector(RpcCollectorKey key, int options, int destcnt, long timeout) { + this.key = key; + this.options = options; + this.destcnt = destcnt; + this.timeout = timeout; + } + + public void addResponse(Serializable message, Member sender){ + Response resp = new Response(sender,message); + responses.add(resp); + } + + public boolean isComplete() { + switch (options) { + case ALL_REPLY: + return destcnt == responses.size(); + case MAJORITY_REPLY: + { + float perc = ((float)responses.size()) / ((float)destcnt); + return perc >= 50f; + } + case FIRST_REPLY: + return responses.size()>0; + default: + return false; + } + } + + public int hashCode() { + return key.hashCode(); + } + + public boolean equals(Object o) { + if ( o instanceof RpcCollector ) { + RpcCollector r = (RpcCollector)o; + return r.key.equals(this.key); + } else return false; + } + + public Response[] getResponses() { + return (Response[])responses.toArray(new Response[responses.size()]); + } + } + + public static class RpcCollectorKey { + byte[] id; + public RpcCollectorKey(byte[] id) { + this.id = id; + } + + public int hashCode() { + return id[0]+id[1]+id[2]+id[3]; + } + + public boolean equals(Object o) { + if ( o instanceof RpcCollectorKey ) { + RpcCollectorKey r = (RpcCollectorKey)o; + return Arrays.equals(id,r.id); + } else return false; + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]