Hi,
By default, a thrift client can not be used by multi-threads. So I extend
the generated code and implement a thread safe version. the service
defination:
service TestRPC {
binary search(1:binary param)
}
here is my extended client code:
public class ThreadSafeTestRPCClient extends TestRPC.Client {
AtomicInteger idGenerator = new AtomicInteger(1);
Connection conn;
public TestRPCClient(TProtocol prot) {
super(prot);
conn = new Connection(prot);
conn.start();
}
public ByteBuffer search(ByteBuffer param) throws
org.apache.thrift.TException {
int seqid = idGenerator.getAndIncrement();
String func = "search";
search_args args = new search_args();
args.setParam(param);
search_result result = new search_result();
Call call = new Call(seqid, func, args, result);
conn.sendParam(call);
call.waitComplete();
return ((search_result)call.getResult()).success;
}
}
class Connection extends Thread {
org.apache.thrift.protocol.TProtocol iprot_;
org.apache.thrift.protocol.TProtocol oprot_;
protected ReentrantLock sendLock = new ReentrantLock();
Map<Integer, Call> calls = new ConcurrentHashMap<Integer, Call>();
public Connection(org.apache.thrift.protocol.TProtocol prot) {
this(prot, prot);
}
public Connection(org.apache.thrift.protocol.TProtocol iprot,
org.apache.thrift.protocol.TProtocol oprot) {
this.iprot_ = iprot;
this.oprot_ = oprot;
}
public void sendParam(Call call) throws TException {
calls.put(call.getSeqid(), call);
sendLock.lock();
try {
oprot_.writeMessageBegin(new
org.apache.thrift.protocol.TMessage(call.getFunc(),
org.apache.thrift.protocol.TMessageType.CALL, call.getSeqid()));
call.getArgs().write(oprot_);
oprot_.writeMessageEnd();
oprot_.getTransport().flush();
} finally {
sendLock.unlock();
}
}
public void run() {
try {
while (true) {
org.apache.thrift.protocol.TMessage msg =
iprot_.readMessageBegin();
if (msg.type ==
org.apache.thrift.protocol.TMessageType.EXCEPTION) {
org.apache.thrift.TApplicationException x =
org.apache.thrift.TApplicationException.read(iprot_);
iprot_.readMessageEnd();
throw x;
}
int seqid = msg.seqid;
Call call = calls.remove(seqid);
call.getResult().read(iprot_);
iprot_.readMessageEnd();
call.complete();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class Call {
private int seqid;
private String func;
private TBase args;
private TBase result;
private ReentrantLock lock = new ReentrantLock();
private boolean complete = false;
private Condition completeCond = lock.newCondition();
public Call(int seqid, String func, TBase args, TBase result) {
this.seqid = seqid;
this.func = func;
this.args = args;
this.result = result;
}
public int getSeqid() {
return seqid;
}
public String getFunc() {
return func;
}
public TBase getArgs() {
return args;
}
public TBase getResult() {
return result;
}
void waitComplete() {
lock.lock();
try {
if (!complete) {
completeCond.await(5000, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
}
void complete() {
lock.lock();
try {
complete = true;
completeCond.signal();
} finally {
lock.unlock();
}
}
}
But when I use this code in multi-threads, I get strange error as below:
org.apache.thrift.transport.TTransportException
at
org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:141)
at
org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101)
at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
at
org.apache.thrift.protocol.TCompactProtocol.readByte(TCompactProtocol.java:574)
at
org.apache.thrift.protocol.TCompactProtocol.readMessageBegin(TCompactProtocol.java:443)
at thrift.Connection.run(TestRPCClient.java:77)
does anyone know what the problem is? how to do recovery?
Any advice will be appreciated.