Hi,

As far as I know, there is no support in python for anything else, then blocking communication/non-blocking/twisted. Non-blocking is not asynchronous and we definitely don't want to use twisted. I'm really interested in adding support for onCompleted/onError callbacks model. After small research it would require slightly modified TNonblockingServer and some changes in python generator code. In generated processors, instead of:

  def process_add(self, seqid, iprot, oprot):
    args = add_args()
    args.read(iprot)
    iprot.readMessageEnd()
    result = add_result()
    try:
      result.success = self._handler.add(args.a, args.b)
    except Ex, ex:
      result.ex = ex
    oprot.writeMessageBegin("add", TMessageType.REPLY, seqid)
    result.write(oprot)
    oprot.writeMessageEnd()
    oprot.trans.flush()

something like this:

  def process(self, iprot, oprot, completed):
    (name, type, seqid) = iprot.readMessageBegin()
    if name not in self._processMap:
      iprot.skip(TType.STRUCT)
      iprot.readMessageEnd()
x = TApplicationException(TApplicationException.UNKNOWN_METHOD, 'Unknown function %s' % (name))
      oprot.writeMessageBegin(name, TMessageType.EXCEPTION, seqid)
      x.write(oprot)
      oprot.writeMessageEnd()
      oprot.trans.flush()
      return
    else:
      self._processMap[name](self, seqid, iprot, oprot, completed)
    return True

  def asyn_completed(self, completed, command, seqid, oprot, result, val):
    result.success = val
    self.async_call(command, seqid, oprot, result)
    completed()

  def asyn_error(self, completed, command, seqid, oprot, result, ex):
    result.ex = ex
    self.async_call(command, seqid, oprot, result)
    completed()

  def async_call(self, command, seqid, oprot, result):
    oprot.writeMessageBegin(command, TMessageType.REPLY, seqid)
    result.write(oprot)
    oprot.writeMessageEnd()
    oprot.trans.flush()

  def process_add2(self, seqid, iprot, oprot, completed):
    args = add_args()
    args.read(iprot)
    iprot.readMessageEnd()
    result = add_result()
    try:
      self._handler.add(args.a, args.b, \
onCompleted=partial(self.asyn_completed, completed, "add", seqid, oprot, result), \ onError=partial(self.asyn_error, completed, "add", seqid, oprot, result))
    except Ex, ex:
      result.ex = ex
      self.asyn_error("add", seqid, oprot, result, ex)

This would allow user, to handle blocking calls in his code, and pass the result (or exception) back to thrift somewhere in the future. Matching code for TAsyncServer could be almost exactly the same as in TNonblockingServer, except for Worker code, which could look like this (changed only processor.process call):

class Worker(threading.Thread):
    """Worker is a small helper to process incoming connection."""
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        """Process queries from task queue, stop if processor is None."""
        while True:
            try:
processor, iprot, oprot, otrans, callback = self.queue.get()
                if processor is None:
                    break
processor.process(iprot, oprot, partial(callback, True, otrans.getValue())
            except Exception:
                logging.exception("Exception while processing request")
                callback(False, '')

What do you think about this? Or maybe I'm going in totally wrong direction?

Piotr Nowojski

Reply via email to