I would actually recommend building a thread safe connection pool where connections are returned to the pool only after methods are written and any return values are returned.
-adam On Sun, Aug 28, 2011 at 9:12 PM, chang liu <[email protected]> wrote: > Hi, > > By default, a thrift client can not be used by multi-threads. So I extend > the generated code and implement a thread safe version. the service > defination: > > service TestRPC { > binary search(1:binary param) > } > > > here is my extended client code: > > public class ThreadSafeTestRPCClient extends TestRPC.Client { > AtomicInteger idGenerator = new AtomicInteger(1); > Connection conn; > > public TestRPCClient(TProtocol prot) { > super(prot); > conn = new Connection(prot); > conn.start(); > } > > public ByteBuffer search(ByteBuffer param) throws > org.apache.thrift.TException { > int seqid = idGenerator.getAndIncrement(); > String func = "search"; > search_args args = new search_args(); > args.setParam(param); > search_result result = new search_result(); > > Call call = new Call(seqid, func, args, result); > > conn.sendParam(call); > call.waitComplete(); > return ((search_result)call.getResult()).success; > } > > } > > class Connection extends Thread { > org.apache.thrift.protocol.TProtocol iprot_; > org.apache.thrift.protocol.TProtocol oprot_; > > protected ReentrantLock sendLock = new ReentrantLock(); > > Map<Integer, Call> calls = new ConcurrentHashMap<Integer, Call>(); > > public Connection(org.apache.thrift.protocol.TProtocol prot) { > this(prot, prot); > } > > public Connection(org.apache.thrift.protocol.TProtocol iprot, > org.apache.thrift.protocol.TProtocol oprot) { > this.iprot_ = iprot; > this.oprot_ = oprot; > } > > public void sendParam(Call call) throws TException { > calls.put(call.getSeqid(), call); > sendLock.lock(); > try { > oprot_.writeMessageBegin(new > org.apache.thrift.protocol.TMessage(call.getFunc(), > org.apache.thrift.protocol.TMessageType.CALL, call.getSeqid())); > call.getArgs().write(oprot_); > oprot_.writeMessageEnd(); > oprot_.getTransport().flush(); > } finally { > sendLock.unlock(); > } > } > > public void run() { > try { > while (true) { > org.apache.thrift.protocol.TMessage msg = > iprot_.readMessageBegin(); > if (msg.type == > org.apache.thrift.protocol.TMessageType.EXCEPTION) { > org.apache.thrift.TApplicationException x = > org.apache.thrift.TApplicationException.read(iprot_); > iprot_.readMessageEnd(); > throw x; > } > int seqid = msg.seqid; > Call call = calls.remove(seqid); > call.getResult().read(iprot_); > iprot_.readMessageEnd(); > call.complete(); > } > } catch (Exception e) { > e.printStackTrace(); > } > } > } > > class Call { > private int seqid; > private String func; > private TBase args; > private TBase result; > private ReentrantLock lock = new ReentrantLock(); > private boolean complete = false; > private Condition completeCond = lock.newCondition(); > > public Call(int seqid, String func, TBase args, TBase result) { > this.seqid = seqid; > this.func = func; > this.args = args; > this.result = result; > } > > public int getSeqid() { > return seqid; > } > > public String getFunc() { > return func; > } > > public TBase getArgs() { > return args; > } > > public TBase getResult() { > return result; > } > > void waitComplete() { > lock.lock(); > try { > if (!complete) { > completeCond.await(5000, TimeUnit.MILLISECONDS); > } > } catch (InterruptedException e) { > > } finally { > lock.unlock(); > } > } > > void complete() { > lock.lock(); > try { > complete = true; > completeCond.signal(); > } finally { > lock.unlock(); > } > } > } > > > But when I use this code in multi-threads, I get strange error as below: > > org.apache.thrift.transport.TTransportException > at > org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132) > at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) > at > org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:141) > at > org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) > at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) > at > org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:574) > at > org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:443) > at thrift.Connection.run(TestRPCClient.java:77) > > > does anyone know what the problem is? how to do recovery? > > Any advice will be appreciated. > -- Adam Fisk http://www.littleshoot.org | http://adamfisk.wordpress.com | http://twitter.com/adamfisk
