Repository: beam Updated Branches: refs/heads/master 1b31167fb -> fe625678f
Remove some internal details from the public API. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98e685d8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98e685d8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98e685d8 Branch: refs/heads/master Commit: 98e685d85a4cf75faee8156516d7d3aff60077f3 Parents: 1b31167 Author: Robert Bradshaw <rober...@google.com> Authored: Wed May 10 15:55:46 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Thu May 11 15:43:17 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline.py | 4 ++-- .../runners/dataflow/dataflow_runner.py | 6 +++--- .../runners/dataflow/dataflow_runner_test.py | 6 +++--- .../apache_beam/runners/direct/executor.py | 2 +- .../runners/direct/transform_evaluator.py | 10 ++++----- sdks/python/apache_beam/transforms/__init__.py | 2 +- sdks/python/apache_beam/transforms/core.py | 22 ++++++++++---------- .../python/apache_beam/transforms/ptransform.py | 6 +++--- .../apache_beam/transforms/ptransform_test.py | 12 +++++------ sdks/python/apache_beam/typehints/typecheck.py | 2 +- 10 files changed, 36 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 79480d7..5048534 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -77,8 +77,8 @@ class Pipeline(object): the PValues are the edges. All the transforms applied to the pipeline must have distinct full labels. - If same transform instance needs to be applied then a clone should be created - with a new label (e.g., transform.clone('new label')). + If same transform instance needs to be applied then the right shift operator + should be used to designate new names (e.g. `input | "label" >> my_tranform`). """ def __init__(self, runner=None, options=None, argv=None): http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index da8de9d..0ecd22a 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -160,7 +160,7 @@ class DataflowRunner(PipelineRunner): class GroupByKeyInputVisitor(PipelineVisitor): """A visitor that replaces `Any` element type for input `PCollection` of - a `GroupByKey` or `GroupByKeyOnly` with a `KV` type. + a `GroupByKey` or `_GroupByKeyOnly` with a `KV` type. TODO(BEAM-115): Once Python SDk is compatible with the new Runner API, we could directly replace the coder instead of mutating the element type. @@ -169,8 +169,8 @@ class DataflowRunner(PipelineRunner): def visit_transform(self, transform_node): # Imported here to avoid circular dependencies. # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.transforms.core import GroupByKey, GroupByKeyOnly - if isinstance(transform_node.transform, (GroupByKey, GroupByKeyOnly)): + from apache_beam.transforms.core import GroupByKey, _GroupByKeyOnly + if isinstance(transform_node.transform, (GroupByKey, _GroupByKeyOnly)): pcoll = transform_node.inputs[0] input_type = pcoll.element_type # If input_type is not specified, then treat it as `Any`. http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index ac9b028..ff4b51d 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -37,7 +37,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeExceptio from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayDataItem -from apache_beam.transforms.core import GroupByKeyOnly +from apache_beam.transforms.core import _GroupByKeyOnly from apache_beam.typehints import typehints # Protect against environments where apitools library is not available. @@ -185,7 +185,7 @@ class DataflowRunnerTest(unittest.TestCase): pcoll1 = PCollection(p) pcoll2 = PCollection(p) pcoll3 = PCollection(p) - for transform in [GroupByKeyOnly(), beam.GroupByKey()]: + for transform in [_GroupByKeyOnly(), beam.GroupByKey()]: pcoll1.element_type = None pcoll2.element_type = typehints.Any pcoll3.element_type = typehints.KV[typehints.Any, typehints.Any] @@ -199,7 +199,7 @@ class DataflowRunnerTest(unittest.TestCase): p = TestPipeline() pcoll1 = PCollection(p) pcoll2 = PCollection(p) - for transform in [GroupByKeyOnly(), beam.GroupByKey()]: + for transform in [_GroupByKeyOnly(), beam.GroupByKey()]: pcoll1.element_type = typehints.TupleSequenceConstraint pcoll2.element_type = typehints.Set err_msg = "Input to GroupByKey must be of Tuple or Any type" http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index 9efbede..86db291 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -155,7 +155,7 @@ class _SerialEvaluationState(_TransformEvaluationState): item of work will be submitted to the ExecutorService at any time. A principal use of this is for evaluators that keeps a global state such as - GroupByKeyOnly. + _GroupByKeyOnly. """ def __init__(self, executor_service, scheduled): http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 6984ded..b1cb626 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -53,7 +53,7 @@ class TransformEvaluatorRegistry(object): io.Read: _BoundedReadEvaluator, core.Flatten: _FlattenEvaluator, core.ParDo: _ParDoEvaluator, - core.GroupByKeyOnly: _GroupByKeyOnlyEvaluator, + core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator, _NativeWrite: _NativeWriteEvaluator, } @@ -83,7 +83,7 @@ class TransformEvaluatorRegistry(object): """Returns True if this applied_ptransform should run one bundle at a time. Some TransformEvaluators use a global state object to keep track of their - global execution state. For example evaluator for GroupByKeyOnly uses this + global execution state. For example evaluator for _GroupByKeyOnly uses this state as an in memory dictionary to buffer keys. Serially executed evaluators will act as syncing point in the graph and @@ -99,7 +99,7 @@ class TransformEvaluatorRegistry(object): True if executor should execute applied_ptransform serially. """ return isinstance(applied_ptransform.transform, - (core.GroupByKeyOnly, _NativeWrite)) + (core._GroupByKeyOnly, _NativeWrite)) class _TransformEvaluator(object): @@ -325,7 +325,7 @@ class _ParDoEvaluator(_TransformEvaluator): class _GroupByKeyOnlyEvaluator(_TransformEvaluator): - """TransformEvaluator for GroupByKeyOnly transform.""" + """TransformEvaluator for _GroupByKeyOnly transform.""" MAX_ELEMENT_PER_BUNDLE = None @@ -369,7 +369,7 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): k, v = element.value self.state.output[self.key_coder.encode(k)].append(v) else: - raise TypeCheckError('Input to GroupByKeyOnly must be a PCollection of ' + raise TypeCheckError('Input to _GroupByKeyOnly must be a PCollection of ' 'windowed key-value pairs. Instead received: %r.' % element) http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/transforms/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index 847fb8f..b77b0f6 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -21,5 +21,5 @@ from apache_beam.transforms import combiners from apache_beam.transforms.core import * from apache_beam.transforms.ptransform import * -from apache_beam.transforms.timeutil import * +from apache_beam.transforms.timeutil import TimeDomain from apache_beam.transforms.util import * http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index abe699f..0e497f9 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -215,7 +215,7 @@ class DoFn(WithTypeHints, HasDisplayData): return Any return type_hint - def process_argspec_fn(self): + def _process_argspec_fn(self): """Returns the Python callable that will eventually be invoked. This should ideally be the user-level function that is called with @@ -307,7 +307,7 @@ class CallableWrapperDoFn(DoFn): return self._strip_output_annotations( trivial_inference.infer_return_type(self._fn, [input_type])) - def process_argspec_fn(self): + def _process_argspec_fn(self): return getattr(self._fn, '_argspec_fn', self._fn) @@ -641,8 +641,8 @@ class ParDo(PTransformWithSideInputs): return fn return CallableWrapperDoFn(fn) - def process_argspec_fn(self): - return self.fn.process_argspec_fn() + def _process_argspec_fn(self): + return self.fn._process_argspec_fn() def display_data(self): return {'fn': DisplayDataItem(self.fn.__class__, @@ -870,19 +870,19 @@ class CombineGlobally(PTransform): def default_label(self): return 'CombineGlobally(%s)' % ptransform.label_from_callable(self.fn) - def clone(self, **extra_attributes): + def _clone(self, **extra_attributes): clone = copy.copy(self) clone.__dict__.update(extra_attributes) return clone def with_defaults(self, has_defaults=True): - return self.clone(has_defaults=has_defaults) + return self._clone(has_defaults=has_defaults) def without_defaults(self): return self.with_defaults(False) def as_singleton_view(self): - return self.clone(as_view=True) + return self._clone(as_view=True) def expand(self, pcoll): def add_input_types(transform): @@ -964,7 +964,7 @@ class CombinePerKey(PTransformWithSideInputs): def default_label(self): return '%s(%s)' % (self.__class__.__name__, self._fn_label) - def process_argspec_fn(self): + def _process_argspec_fn(self): return self.fn._fn # pylint: disable=protected-access def expand(self, pcoll): @@ -1133,7 +1133,7 @@ class GroupByKey(PTransform): return (pcoll | 'ReifyWindows' >> (ParDo(self.ReifyWindows()) .with_output_types(reify_output_type)) - | 'GroupByKey' >> (GroupByKeyOnly() + | 'GroupByKey' >> (_GroupByKeyOnly() .with_input_types(reify_output_type) .with_output_types(gbk_input_type)) | ('GroupByWindow' >> ParDo( @@ -1144,14 +1144,14 @@ class GroupByKey(PTransform): # The input_type is None, run the default return (pcoll | 'ReifyWindows' >> ParDo(self.ReifyWindows()) - | 'GroupByKey' >> GroupByKeyOnly() + | 'GroupByKey' >> _GroupByKeyOnly() | 'GroupByWindow' >> ParDo( self.GroupAlsoByWindow(pcoll.windowing))) @typehints.with_input_types(typehints.KV[K, V]) @typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]]) -class GroupByKeyOnly(PTransform): +class _GroupByKeyOnly(PTransform): """A group by key transform, ignoring windows.""" def infer_output_type(self, input_type): key_type, value_type = trivial_inference.key_value_types(input_type) http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 8898c36..bd2a120 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -303,7 +303,7 @@ class PTransform(WithTypeHints, HasDisplayData): # TODO(ccy): further refine this API. return None - def clone(self, new_label): + def _clone(self, new_label): """Clones the current transform instance under a new label.""" transform = copy.copy(self) transform.label = new_label @@ -567,7 +567,7 @@ class PTransformWithSideInputs(PTransform): arg_types = [pvalueish.element_type] + [element_type(v) for v in args] kwargs_types = {k: element_type(v) for (k, v) in kwargs.items()} - argspec_fn = self.process_argspec_fn() + argspec_fn = self._process_argspec_fn() bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types) hints = getcallargs_forhints(argspec_fn, *type_hints[0], **type_hints[1]) for arg, hint in hints.items(): @@ -581,7 +581,7 @@ class PTransformWithSideInputs(PTransform): 'Type hint violation for \'%s\': requires %s but got %s for %s' % (self.label, hint, bindings[arg], arg)) - def process_argspec_fn(self): + def _process_argspec_fn(self): """Returns an argspec of the function actually consuming the data. """ raise NotImplementedError http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index f790660..efc5978 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -35,7 +35,7 @@ import apache_beam.pvalue as pvalue from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that, equal_to from apache_beam.transforms import window -from apache_beam.transforms.core import GroupByKeyOnly +from apache_beam.transforms.core import _GroupByKeyOnly import apache_beam.transforms.combiners as combine from apache_beam.transforms.display import DisplayData, DisplayDataItem from apache_beam.transforms.ptransform import PTransform @@ -580,7 +580,7 @@ class PTransformTest(unittest.TestCase): pipeline = TestPipeline() pcolls = pipeline | 'A' >> beam.Create(['a', 'b', 'f']) with self.assertRaises(typehints.TypeCheckError) as cm: - pcolls | 'D' >> GroupByKeyOnly() + pcolls | 'D' >> _GroupByKeyOnly() pipeline.run() expected_error_prefix = ('Input type hint violation at D: expected ' @@ -1088,7 +1088,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): | 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) | ('Pair' >> beam.Map(lambda x: (x, ord(x))) .with_output_types(typehints.KV[str, str])) - | GroupByKeyOnly()) + | _GroupByKeyOnly()) # Output type should correctly be deduced. # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]]. @@ -1112,7 +1112,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create([1, 2, 3]).with_output_types(int) - | 'F' >> GroupByKeyOnly()) + | 'F' >> _GroupByKeyOnly()) self.assertEqual("Input type hint violation at F: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -1155,7 +1155,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): (self.p | 'Nums' >> beam.Create(range(5)).with_output_types(int) | 'ModDup' >> beam.Map(lambda x: (x % 2, x)) - | GroupByKeyOnly()) + | _GroupByKeyOnly()) self.assertEqual('Pipeline type checking is enabled, however no output ' 'type-hint was found for the PTransform ' @@ -1978,7 +1978,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): def test_gbk_type_inference(self): self.assertEqual( typehints.Tuple[str, typehints.Iterable[int]], - GroupByKeyOnly().infer_output_type(typehints.KV[str, int])) + _GroupByKeyOnly().infer_output_type(typehints.KV[str, int])) def test_pipeline_inference(self): created = self.p | beam.Create(['a', 'b', 'c']) http://git-wip-us.apache.org/repos/asf/beam/blob/98e685d8/sdks/python/apache_beam/typehints/typecheck.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py index 09b73f9..89a5f5c 100644 --- a/sdks/python/apache_beam/typehints/typecheck.py +++ b/sdks/python/apache_beam/typehints/typecheck.py @@ -109,7 +109,7 @@ class TypeCheckWrapperDoFn(AbstractDoFnWrapper): def __init__(self, dofn, type_hints, label=None): super(TypeCheckWrapperDoFn, self).__init__(dofn) self.dofn = dofn - self._process_fn = self.dofn.process_argspec_fn() + self._process_fn = self.dofn._process_argspec_fn() if type_hints.input_types: input_args, input_kwargs = type_hints.input_types self._input_hints = getcallargs_forhints(