I would actually recommend building a thread safe connection pool
where connections are returned to the pool only after methods are
written and any return values are returned.

-adam

On Sun, Aug 28, 2011 at 9:12 PM, chang liu <[email protected]> wrote:
> Hi,
>
>   By default, a thrift client can not be used by multi-threads. So I extend
> the generated code and implement a thread safe version. the service
> defination:
>
>  service TestRPC {
>    binary search(1:binary param)
>  }
>
>
> here is my extended client code:
>
> public class ThreadSafeTestRPCClient extends TestRPC.Client {
>    AtomicInteger idGenerator = new AtomicInteger(1);
>    Connection conn;
>
>    public TestRPCClient(TProtocol prot) {
>        super(prot);
>        conn = new Connection(prot);
>        conn.start();
>    }
>
>    public ByteBuffer search(ByteBuffer param) throws
> org.apache.thrift.TException {
>        int seqid = idGenerator.getAndIncrement();
>        String func = "search";
>        search_args args = new search_args();
>        args.setParam(param);
>        search_result result = new search_result();
>
>        Call call = new Call(seqid, func, args, result);
>
>        conn.sendParam(call);
>        call.waitComplete();
>        return ((search_result)call.getResult()).success;
>    }
>
> }
>
> class Connection extends Thread {
>    org.apache.thrift.protocol.TProtocol iprot_;
>    org.apache.thrift.protocol.TProtocol oprot_;
>
>    protected ReentrantLock sendLock = new ReentrantLock();
>
>    Map<Integer, Call> calls = new ConcurrentHashMap<Integer, Call>();
>
>    public Connection(org.apache.thrift.protocol.TProtocol prot) {
>        this(prot, prot);
>    }
>
>    public Connection(org.apache.thrift.protocol.TProtocol iprot,
> org.apache.thrift.protocol.TProtocol oprot) {
>        this.iprot_ = iprot;
>        this.oprot_ = oprot;
>    }
>
>    public void sendParam(Call call) throws TException {
>        calls.put(call.getSeqid(), call);
>        sendLock.lock();
>        try {
>            oprot_.writeMessageBegin(new
> org.apache.thrift.protocol.TMessage(call.getFunc(),
> org.apache.thrift.protocol.TMessageType.CALL, call.getSeqid()));
>            call.getArgs().write(oprot_);
>            oprot_.writeMessageEnd();
>            oprot_.getTransport().flush();
>        } finally {
>            sendLock.unlock();
>        }
>    }
>
>    public void run() {
>        try {
>            while (true) {
>                org.apache.thrift.protocol.TMessage msg =
> iprot_.readMessageBegin();
>                if (msg.type ==
> org.apache.thrift.protocol.TMessageType.EXCEPTION) {
>                    org.apache.thrift.TApplicationException x =
> org.apache.thrift.TApplicationException.read(iprot_);
>                    iprot_.readMessageEnd();
>                    throw x;
>                }
>                int seqid = msg.seqid;
>                Call call = calls.remove(seqid);
>                call.getResult().read(iprot_);
>                iprot_.readMessageEnd();
>                call.complete();
>            }
>        } catch (Exception e) {
>            e.printStackTrace();
>        }
>    }
> }
>
> class Call {
>    private int seqid;
>    private String func;
>    private TBase args;
>    private TBase result;
>    private ReentrantLock lock = new ReentrantLock();
>    private boolean complete = false;
>    private Condition completeCond = lock.newCondition();
>
>    public Call(int seqid, String func, TBase args, TBase result) {
>        this.seqid = seqid;
>        this.func = func;
>        this.args = args;
>        this.result = result;
>    }
>
>    public int getSeqid() {
>        return seqid;
>    }
>
>    public String getFunc() {
>        return func;
>    }
>
>    public TBase getArgs() {
>        return args;
>    }
>
>    public TBase getResult() {
>        return result;
>    }
>
>    void waitComplete() {
>        lock.lock();
>        try {
>            if (!complete) {
>                completeCond.await(5000, TimeUnit.MILLISECONDS);
>            }
>        } catch (InterruptedException e) {
>
>        } finally {
>            lock.unlock();
>        }
>    }
>
>    void complete() {
>        lock.lock();
>        try {
>            complete = true;
>            completeCond.signal();
>        } finally {
>            lock.unlock();
>        }
>    }
> }
>
>
> But when I use this code in multi-threads, I get strange error as below:
>
> org.apache.thrift.transport.TTransportException
>    at
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
>    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>    at
> org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:141)
>    at
> org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
>    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
>    at
> org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:574)
>    at
> org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:443)
>    at thrift.Connection.run(TestRPCClient.java:77)
>
>
> does anyone know what the problem is?  how to do recovery?
>
> Any advice will be appreciated.
>



-- 
Adam Fisk
http://www.littleshoot.org | http://adamfisk.wordpress.com |
http://twitter.com/adamfisk

Reply via email to