Actually, I have tried this idea. I've built a thread safe connection pool by myself, but found the performace(I mean throughput) of this solution is worse than that of a thread safe client with multiplex connection. The reason I guess may be the connection from the pool is not fully utilized during args writtern to result returned. Appreciate any comment.
On Tue, Sep 27, 2011 at 7:22 AM, Adam Fisk <[email protected]> wrote: > I would actually recommend building a thread safe connection pool > where connections are returned to the pool only after methods are > written and any return values are returned. > > -adam > > On Sun, Aug 28, 2011 at 9:12 PM, chang liu <[email protected]> wrote: > > Hi, > > > > By default, a thrift client can not be used by multi-threads. So I > extend > > the generated code and implement a thread safe version. the service > > defination: > > > > service TestRPC { > > binary search(1:binary param) > > } > > > > > > here is my extended client code: > > > > public class ThreadSafeTestRPCClient extends TestRPC.Client { > > AtomicInteger idGenerator = new AtomicInteger(1); > > Connection conn; > > > > public TestRPCClient(TProtocol prot) { > > super(prot); > > conn = new Connection(prot); > > conn.start(); > > } > > > > public ByteBuffer search(ByteBuffer param) throws > > org.apache.thrift.TException { > > int seqid = idGenerator.getAndIncrement(); > > String func = "search"; > > search_args args = new search_args(); > > args.setParam(param); > > search_result result = new search_result(); > > > > Call call = new Call(seqid, func, args, result); > > > > conn.sendParam(call); > > call.waitComplete(); > > return ((search_result)call.getResult()).success; > > } > > > > } > > > > class Connection extends Thread { > > org.apache.thrift.protocol.TProtocol iprot_; > > org.apache.thrift.protocol.TProtocol oprot_; > > > > protected ReentrantLock sendLock = new ReentrantLock(); > > > > Map<Integer, Call> calls = new ConcurrentHashMap<Integer, Call>(); > > > > public Connection(org.apache.thrift.protocol.TProtocol prot) { > > this(prot, prot); > > } > > > > public Connection(org.apache.thrift.protocol.TProtocol iprot, > > org.apache.thrift.protocol.TProtocol oprot) { > > this.iprot_ = iprot; > > this.oprot_ = oprot; > > } > > > > public void sendParam(Call call) throws TException { > > calls.put(call.getSeqid(), call); > > sendLock.lock(); > > try { > > oprot_.writeMessageBegin(new > > org.apache.thrift.protocol.TMessage(call.getFunc(), > > org.apache.thrift.protocol.TMessageType.CALL, call.getSeqid())); > > call.getArgs().write(oprot_); > > oprot_.writeMessageEnd(); > > oprot_.getTransport().flush(); > > } finally { > > sendLock.unlock(); > > } > > } > > > > public void run() { > > try { > > while (true) { > > org.apache.thrift.protocol.TMessage msg = > > iprot_.readMessageBegin(); > > if (msg.type == > > org.apache.thrift.protocol.TMessageType.EXCEPTION) { > > org.apache.thrift.TApplicationException x = > > org.apache.thrift.TApplicationException.read(iprot_); > > iprot_.readMessageEnd(); > > throw x; > > } > > int seqid = msg.seqid; > > Call call = calls.remove(seqid); > > call.getResult().read(iprot_); > > iprot_.readMessageEnd(); > > call.complete(); > > } > > } catch (Exception e) { > > e.printStackTrace(); > > } > > } > > } > > > > class Call { > > private int seqid; > > private String func; > > private TBase args; > > private TBase result; > > private ReentrantLock lock = new ReentrantLock(); > > private boolean complete = false; > > private Condition completeCond = lock.newCondition(); > > > > public Call(int seqid, String func, TBase args, TBase result) { > > this.seqid = seqid; > > this.func = func; > > this.args = args; > > this.result = result; > > } > > > > public int getSeqid() { > > return seqid; > > } > > > > public String getFunc() { > > return func; > > } > > > > public TBase getArgs() { > > return args; > > } > > > > public TBase getResult() { > > return result; > > } > > > > void waitComplete() { > > lock.lock(); > > try { > > if (!complete) { > > completeCond.await(5000, TimeUnit.MILLISECONDS); > > } > > } catch (InterruptedException e) { > > > > } finally { > > lock.unlock(); > > } > > } > > > > void complete() { > > lock.lock(); > > try { > > complete = true; > > completeCond.signal(); > > } finally { > > lock.unlock(); > > } > > } > > } > > > > > > But when I use this code in multi-threads, I get strange error as below: > > > > org.apache.thrift.transport.TTransportException > > at > > > org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) > > at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) > > at > > > org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:141) > > at > > > org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) > > at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) > > at > > > org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:574) > > at > > > org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:443) > > at thrift.Connection.run(TestRPCClient.java:77) > > > > > > does anyone know what the problem is? how to do recovery? > > > > Any advice will be appreciated. > > > > > > -- > Adam Fisk > http://www.littleshoot.org | http://adamfisk.wordpress.com | > http://twitter.com/adamfisk >
