Hi,
As far as I know, there is no support in python for anything else, then
blocking communication/non-blocking/twisted. Non-blocking is not
asynchronous and we definitely don't want to use twisted. I'm really
interested in adding support for onCompleted/onError callbacks model.
After small research it would require slightly modified
TNonblockingServer and some changes in python generator code. In
generated processors, instead of:
def process_add(self, seqid, iprot, oprot):
args = add_args()
args.read(iprot)
iprot.readMessageEnd()
result = add_result()
try:
result.success = self._handler.add(args.a, args.b)
except Ex, ex:
result.ex = ex
oprot.writeMessageBegin("add", TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
something like this:
def process(self, iprot, oprot, completed):
(name, type, seqid) = iprot.readMessageBegin()
if name not in self._processMap:
iprot.skip(TType.STRUCT)
iprot.readMessageEnd()
x = TApplicationException(TApplicationException.UNKNOWN_METHOD,
'Unknown function %s' % (name))
oprot.writeMessageBegin(name, TMessageType.EXCEPTION, seqid)
x.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
return
else:
self._processMap[name](self, seqid, iprot, oprot, completed)
return True
def asyn_completed(self, completed, command, seqid, oprot, result, val):
result.success = val
self.async_call(command, seqid, oprot, result)
completed()
def asyn_error(self, completed, command, seqid, oprot, result, ex):
result.ex = ex
self.async_call(command, seqid, oprot, result)
completed()
def async_call(self, command, seqid, oprot, result):
oprot.writeMessageBegin(command, TMessageType.REPLY, seqid)
result.write(oprot)
oprot.writeMessageEnd()
oprot.trans.flush()
def process_add2(self, seqid, iprot, oprot, completed):
args = add_args()
args.read(iprot)
iprot.readMessageEnd()
result = add_result()
try:
self._handler.add(args.a, args.b, \
onCompleted=partial(self.asyn_completed,
completed, "add", seqid, oprot, result), \
onError=partial(self.asyn_error, completed,
"add", seqid, oprot, result))
except Ex, ex:
result.ex = ex
self.asyn_error("add", seqid, oprot, result, ex)
This would allow user, to handle blocking calls in his code, and pass
the result (or exception) back to thrift somewhere in the future.
Matching code for TAsyncServer could be almost exactly the same as in
TNonblockingServer, except for Worker code, which could look like this
(changed only processor.process call):
class Worker(threading.Thread):
"""Worker is a small helper to process incoming connection."""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
"""Process queries from task queue, stop if processor is None."""
while True:
try:
processor, iprot, oprot, otrans, callback =
self.queue.get()
if processor is None:
break
processor.process(iprot, oprot, partial(callback, True,
otrans.getValue())
except Exception:
logging.exception("Exception while processing request")
callback(False, '')
What do you think about this? Or maybe I'm going in totally wrong direction?
Piotr Nowojski