fixed error handling for read task, using read task as done future on resultset
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/00786389 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/00786389 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/00786389 Branch: refs/heads/TINKERPOP-1599 Commit: 00786389303601b0da9dc2bbcc8a43f0497aebef Parents: 2577a13 Author: davebshow <davebs...@gmail.com> Authored: Sat Jan 28 12:16:47 2017 -0500 Committer: davebshow <davebs...@gmail.com> Committed: Mon Jan 30 11:51:24 2017 -0500 ---------------------------------------------------------------------- .../jython/gremlin_python/driver/connection.py | 3 ++- .../driver/driver_remote_connection.py | 6 +++-- .../jython/gremlin_python/driver/protocol.py | 15 +++++++----- .../gremlin_python/driver/remote_connection.py | 11 ++++----- .../jython/gremlin_python/driver/resultset.py | 24 ++++++++++++++++---- 5 files changed, 39 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/00786389/gremlin-python/src/main/jython/gremlin_python/driver/connection.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/connection.py index 2f59883..44ca8a3 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/connection.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/connection.py @@ -63,7 +63,8 @@ class Connection: future.set_exception(e) else: # Start receive task - self._executor.submit(self._receive) + done = self._executor.submit(self._receive) + result_set.done = done future.set_result(result_set) future_write.add_done_callback(cb) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/00786389/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py index 4e70a87..104c1f7 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/driver_remote_connection.py @@ -20,10 +20,10 @@ from concurrent.futures import Future from gremlin_python.driver import client from gremlin_python.driver.remote_connection import ( - RemoteTraversal, RemoteTraversalSideEffects) + RemoteConnection, RemoteTraversal, RemoteTraversalSideEffects) -class DriverRemoteConnection: +class DriverRemoteConnection(RemoteConnection): def __init__(self, url, traversal_source, protocol_factory=None, transport_factory=None, pool_size=None, max_workers=None, @@ -31,6 +31,8 @@ class DriverRemoteConnection: self._client = client.Client(url, traversal_source, protocol_factory, transport_factory, pool_size, max_workers, None, username, password) + self._url = self._client._url + self._traversal_source = self._client._traversal_source def close(self): self._client.close() http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/00786389/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py b/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py index 6ea17a5..df72bf7 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/protocol.py @@ -27,6 +27,10 @@ import six from gremlin_python.driver import serializer, request +class GremlinServerError(Exception): + pass + + @six.add_metaclass(abc.ABCMeta) class AbstractBaseProtocol: @@ -66,7 +70,7 @@ class GremlinServerWSProtocol(AbstractBaseProtocol): result_set = results_dict[request_id] status_code = data['status']['code'] aggregate_to = data['result']['meta'].get('aggregateTo', 'list') - result_set._aggregate_to = aggregate_to + result_set.aggregate_to = aggregate_to if status_code == 407: auth = b''.join([b'\x00', self._username.encode('utf-8'), b'\x00', self._password.encode('utf-8')]) @@ -77,7 +81,7 @@ class GremlinServerWSProtocol(AbstractBaseProtocol): data = self._transport.read() self.data_received(data, results_dict) elif status_code == 204: - result_set.done.set_result(None) + result_set.stream.put_nowait([]) del results_dict[request_id] elif status_code in [200, 206]: results = [] @@ -89,10 +93,9 @@ class GremlinServerWSProtocol(AbstractBaseProtocol): data = self._transport.read() self.data_received(data, results_dict) else: - result_set.done.set_result(None) + # result_set.done.set_result(None) del results_dict[request_id] else: - result_set.stream.put_nowait(GremlinServerError( - "{0}: {1}".format(status_code, data["status"]["message"]))) - result_set.done.set_result(None) del results_dict[request_id] + raise GremlinServerError( + "{0}: {1}".format(status_code, data["status"]["message"])) http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/00786389/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py index a95ea10..4d34e68 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/remote_connection.py @@ -77,6 +77,7 @@ class RemoteTraversalSideEffects(traversal.TraversalSideEffects): return self._keys def get(self, key): + if not self._side_effects.get(key): if not self._closed: message = request.RequestMessage( @@ -101,17 +102,15 @@ class RemoteTraversalSideEffects(traversal.TraversalSideEffects): return results def _aggregate_results(self, result_set): - # Need to double check how all this works - aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {}} + aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {}, + 'none': None} results = None for msg in result_set: if results is None: - aggregate_to = result_set._aggregate_to - if aggregate_to: - results = aggregates.get(aggregate_to, []) + aggregate_to = result_set.aggregate_to + results = aggregates.get(aggregate_to, []) # on first message, get the right result data structure # if there is no update to a structure, then the item is the result - # not really sure about this... if results is None: results = msg[0] # updating a map is different than a list or a set http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/00786389/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py ---------------------------------------------------------------------- diff --git a/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py b/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py index e12e0a0..01c1968 100644 --- a/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py +++ b/gremlin-python/src/main/jython/gremlin_python/driver/resultset.py @@ -24,10 +24,18 @@ class ResultSet: def __init__(self, stream, request_id): self._stream = stream self._request_id = request_id - self._done = Future() + self._done = None self._aggregate_to = None @property + def aggregate_to(self): + return self._aggregate_to + + @aggregate_to.setter + def aggregate_to(self, val): + self._aggregate_to = val + + @property def request_id(self): return self._request_id @@ -51,11 +59,17 @@ class ResultSet: def done(self): return self._done + @done.setter + def done(self, future): + self._done = future + def one(self): - if self.stream.empty() and self.done.done(): - return - result = self.stream.get() - return result + while not self.done.done(): + if not self.stream.empty(): + return self.stream.get_nowait() + if not self.stream.empty(): + return self.stream.get_nowait() + return self.done.result() def all(self): future = Future()