Repository: incubator-beam Updated Branches: refs/heads/python-sdk a1a51c3c1 -> d898d56ae
Handle HttpError in GCS upload thread * break connection to the main thread and propagate the exception. * Retry in auth _refresh() to guard against temporary errors in the metadata service. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a4267d26 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a4267d26 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a4267d26 Branch: refs/heads/python-sdk Commit: a4267d264395706f12479aa876501a62d5b679b7 Parents: a1a51c3 Author: Ahmet Altay <al...@google.com> Authored: Fri Jul 8 16:28:07 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Thu Jul 14 18:15:34 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/internal/auth.py | 2 ++ sdks/python/apache_beam/io/gcsio.py | 23 ++++++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4267d26/sdks/python/apache_beam/internal/auth.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/auth.py b/sdks/python/apache_beam/internal/auth.py index 0081970..f324a2d 100644 --- a/sdks/python/apache_beam/internal/auth.py +++ b/sdks/python/apache_beam/internal/auth.py @@ -82,6 +82,8 @@ class GCEMetadataCredentials(OAuth2Credentials): None, # token_uri user_agent) + @retry.with_exponential_backoff( + retry_filter=retry.retry_on_server_errors_and_timeout_filter) def _refresh(self, http_request): refresh_time = datetime.datetime.now() req = urllib2.Request('http://metadata.google.internal/computeMetadata/v1/' http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4267d26/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index a01988b..c61f251 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -533,6 +533,7 @@ class GcsBufferedWriter(object): # Set up communication with uploading thread. parent_conn, child_conn = multiprocessing.Pipe() + self.child_conn = child_conn self.conn = parent_conn # Set up uploader. @@ -547,6 +548,7 @@ class GcsBufferedWriter(object): # Start uploading thread. self.upload_thread = threading.Thread(target=self._start_upload) self.upload_thread.daemon = True + self.upload_thread.last_error = None self.upload_thread.start() # TODO(silviuc): Refactor so that retry logic can be applied. @@ -560,7 +562,15 @@ class GcsBufferedWriter(object): # # The uploader by default transfers data in chunks of 1024 * 1024 bytes at # a time, buffering writes until that size is reached. - self.client.objects.Insert(self.insert_request, upload=self.upload) + try: + self.client.objects.Insert(self.insert_request, upload=self.upload) + except HttpError as http_error: + logging.error( + 'HTTP error while inserting file %s: %s', self.path, http_error) + self.upload_thread.last_error = http_error + raise + finally: + self.child_conn.close() def write(self, data): """Write data to a GCS file. @@ -574,8 +584,14 @@ class GcsBufferedWriter(object): self._check_open() if not data: return - self.conn.send_bytes(data) - self.position += len(data) + try: + self.conn.send_bytes(data) + self.position += len(data) + except IOError: + if self.upload_thread.last_error: + raise self.upload_thread.last_error # pylint: disable=raising-bad-type + else: + raise def tell(self): """Return the total number of bytes passed to write() so far.""" @@ -583,6 +599,7 @@ class GcsBufferedWriter(object): def close(self): """Close the current GCS file.""" + self.closed = True self.conn.close() self.upload_thread.join()