Repository: incubator-beam Updated Branches: refs/heads/python-sdk 6dcc429e5 -> f19f767b0
Fix the pickle issue with the inconsistency of dill load and dump session Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aef4858b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aef4858b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aef4858b Branch: refs/heads/python-sdk Commit: aef4858b80dfacf3401e6672b9373c82a8e77027 Parents: 6dcc429 Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Fri Dec 2 15:02:18 2016 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Tue Dec 6 10:14:10 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/internal/apiclient.py | 19 +++++++++++++++++-- sdks/python/apache_beam/internal/pickler.py | 8 ++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aef4858b/sdks/python/apache_beam/internal/apiclient.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index c5f5f70..f1341a7 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -46,7 +46,6 @@ from apache_beam.utils.options import WorkerOptions from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow - BIGQUERY_API_SERVICE = 'bigquery.googleapis.com' COMPUTE_API_SERVICE = 'compute.googleapis.com' STORAGE_API_SERVICE = 'storage.googleapis.com' @@ -55,13 +54,19 @@ STORAGE_API_SERVICE = 'storage.googleapis.com' class Step(object): """Wrapper for a dataflow Step protobuf.""" - def __init__(self, step_kind, step_name): + def __init__(self, step_kind, step_name, additional_properties=None): self.step_kind = step_kind self.step_name = step_name self.proto = dataflow.Step(kind=step_kind, name=step_name) self.proto.properties = {} + self._additional_properties = [] + + if additional_properties is not None: + for (n, v, t) in additional_properties: + self.add_property(n, v, t) def add_property(self, name, value, with_type=False): + self._additional_properties.append((name, value, with_type)) self.proto.properties.additionalProperties.append( dataflow.Step.PropertiesValue.AdditionalProperty( key=name, value=to_json_value(value, with_type=with_type))) @@ -77,6 +82,11 @@ class Step(object): outputs.append(entry_prop.value.string_value) return outputs + def __reduce__(self): + """Reduce hook for pickling the Step class more easily + """ + return (Step, (self.step_kind, self.step_name, self._additional_properties)) + def get_output(self, tag=None): """Returns name if it is one of the outputs or first output if name is None. @@ -330,6 +340,11 @@ class Job(object): def json(self): return encoding.MessageToJson(self.proto) + def __reduce__(self): + """Reduce hook for pickling the Job class more easily + """ + return (Job, (self.options,)) + class DataflowApplicationClient(object): """A Dataflow API client used by application code to create and query jobs.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aef4858b/sdks/python/apache_beam/internal/pickler.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 30f0b77..d39a497 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -204,6 +204,14 @@ def loads(encoded): def dump_session(file_path): + """Pickle the current python session to be used in the worker. + + Note: Due to the inconsistency in the first dump of dill dump_session we + create and load the dump twice to have consistent results in the worker and + the running session. Check: https://github.com/uqfoundation/dill/issues/195 + """ + dill.dump_session(file_path) + dill.load_session(file_path) return dill.dump_session(file_path)