Hi,

   By default, a thrift client can not be used by multi-threads. So I extend
the generated code and implement a thread safe version. the service
defination:

  service TestRPC {
    binary search(1:binary param)
 }


here is my extended client code:

public class ThreadSafeTestRPCClient extends TestRPC.Client {
    AtomicInteger idGenerator = new AtomicInteger(1);
    Connection conn;

    public TestRPCClient(TProtocol prot) {
        super(prot);
        conn = new Connection(prot);
        conn.start();
    }

    public ByteBuffer search(ByteBuffer param) throws
org.apache.thrift.TException {
        int seqid = idGenerator.getAndIncrement();
        String func = "search";
        search_args args = new search_args();
        args.setParam(param);
        search_result result = new search_result();

        Call call = new Call(seqid, func, args, result);

        conn.sendParam(call);
        call.waitComplete();
        return ((search_result)call.getResult()).success;
    }

}

class Connection extends Thread {
    org.apache.thrift.protocol.TProtocol iprot_;
    org.apache.thrift.protocol.TProtocol oprot_;

    protected ReentrantLock sendLock = new ReentrantLock();

    Map<Integer, Call> calls = new ConcurrentHashMap<Integer, Call>();

    public Connection(org.apache.thrift.protocol.TProtocol prot) {
        this(prot, prot);
    }

    public Connection(org.apache.thrift.protocol.TProtocol iprot,
org.apache.thrift.protocol.TProtocol oprot) {
        this.iprot_ = iprot;
        this.oprot_ = oprot;
    }

    public void sendParam(Call call) throws TException {
        calls.put(call.getSeqid(), call);
        sendLock.lock();
        try {
            oprot_.writeMessageBegin(new
org.apache.thrift.protocol.TMessage(call.getFunc(),
org.apache.thrift.protocol.TMessageType.CALL, call.getSeqid()));
            call.getArgs().write(oprot_);
            oprot_.writeMessageEnd();
            oprot_.getTransport().flush();
        } finally {
            sendLock.unlock();
        }
    }

    public void run() {
        try {
            while (true) {
                org.apache.thrift.protocol.TMessage msg =
iprot_.readMessageBegin();
                if (msg.type ==
org.apache.thrift.protocol.TMessageType.EXCEPTION) {
                    org.apache.thrift.TApplicationException x =
org.apache.thrift.TApplicationException.read(iprot_);
                    iprot_.readMessageEnd();
                    throw x;
                }
                int seqid = msg.seqid;
                Call call = calls.remove(seqid);
                call.getResult().read(iprot_);
                iprot_.readMessageEnd();
                call.complete();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class Call {
    private int seqid;
    private String func;
    private TBase args;
    private TBase result;
    private ReentrantLock lock = new ReentrantLock();
    private boolean complete = false;
    private Condition completeCond = lock.newCondition();

    public Call(int seqid, String func, TBase args, TBase result) {
        this.seqid = seqid;
        this.func = func;
        this.args = args;
        this.result = result;
    }

    public int getSeqid() {
        return seqid;
    }

    public String getFunc() {
        return func;
    }

    public TBase getArgs() {
        return args;
    }

    public TBase getResult() {
        return result;
    }

    void waitComplete() {
        lock.lock();
        try {
            if (!complete) {
                completeCond.await(5000, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException e) {

        } finally {
            lock.unlock();
        }
    }

    void complete() {
        lock.lock();
        try {
            complete = true;
            completeCond.signal();
        } finally {
            lock.unlock();
        }
    }
}


But when I use this code in multi-threads, I get strange error as below:

org.apache.thrift.transport.TTransportException
    at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
    at
org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:141)
    at
org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
    at
org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:574)
    at
org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:443)
    at thrift.Connection.run(TestRPCClient.java:77)


does anyone know what the problem is?  how to do recovery?

Any advice will be appreciated.

Reply via email to