[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86706=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86706 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 02/Apr/18 19:08 Start Date: 02/Apr/18 19:08 Worklog Time Spent: 10m Work Description: aaltay closed pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index d0ab55f5462..893e32e3357 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -51,10 +51,10 @@ class _SideInputView(object): def __init__(self, view): self._view = view -self.callable_queue = collections.deque() +self.blocked_tasks = collections.deque() self.elements = [] self.value = None -self.has_result = False +self.watermark = None def __repr__(self): elements_string = (', '.join(str(elm) for elm in self.elements) @@ -69,67 +69,108 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It gets the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: ``_UnpickledSideInput`` value. + task: ``TransformExecutor`` task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The ``SideInputMap`` value of a view when the tasks it blocks are + unblocked. Otherwise, None. +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.output_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86694=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86694 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 02/Apr/18 18:29 Start Date: 02/Apr/18 18:29 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178612873 ## File path: sdks/python/apache_beam/runners/direct/executor.py ## @@ -271,6 +272,14 @@ def __init__(self, transform_evaluator_registry, evaluation_context, self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle +# For non-empty bundles, store the window of the max EOW. +# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues +# if input_bundle._elements: Review comment: Done! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86694) Time Spent: 11h (was: 10h 50m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 11h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86693=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86693 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 02/Apr/18 18:29 Start Date: 02/Apr/18 18:29 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178611376 ## File path: sdks/python/apache_beam/runners/direct/executor.py ## @@ -271,6 +272,14 @@ def __init__(self, transform_evaluator_registry, evaluation_context, self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle +# For non-empty bundles, store the window of the max EOW. +# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues +# if input_bundle._elements: Review comment: I got it, the if. Done! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86693) Time Spent: 10h 50m (was: 10h 40m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 10h 50m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86692=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86692 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 02/Apr/18 18:29 Start Date: 02/Apr/18 18:29 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178610974 ## File path: sdks/python/apache_beam/runners/direct/executor.py ## @@ -271,6 +272,14 @@ def __init__(self, transform_evaluator_registry, evaluation_context, self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle +# For non-empty bundles, store the window of the max EOW. +# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues +# if input_bundle._elements: Review comment: Both parts are needed. `# For non-empty bundles, store the window of the max EOW.` was part of the initial PR. `# TODO(mariagh): Move...` was added as a review request. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86692) Time Spent: 10h 40m (was: 10.5h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 10h 40m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86690=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86690 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 02/Apr/18 18:23 Start Date: 02/Apr/18 18:23 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178611376 ## File path: sdks/python/apache_beam/runners/direct/executor.py ## @@ -271,6 +272,14 @@ def __init__(self, transform_evaluator_registry, evaluation_context, self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle +# For non-empty bundles, store the window of the max EOW. +# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues +# if input_bundle._elements: Review comment: I got it, the if. Done! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86690) Time Spent: 10.5h (was: 10h 20m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 10.5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86689=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86689 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 02/Apr/18 18:22 Start Date: 02/Apr/18 18:22 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178611141 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The (SideInputMap) value of a view when the tasks it blocks are unblocked + Otherwise, None. +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input view,
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86688=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86688 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 02/Apr/18 18:21 Start Date: 02/Apr/18 18:21 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178610974 ## File path: sdks/python/apache_beam/runners/direct/executor.py ## @@ -271,6 +272,14 @@ def __init__(self, transform_evaluator_registry, evaluation_context, self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle +# For non-empty bundles, store the window of the max EOW. +# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues +# if input_bundle._elements: Review comment: Both parts are needed. `# For non-empty bundles, store the window of the max EOW.` was part of the initial PR. `# TODO(mariagh): Move...` was added as a review request. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86688) Time Spent: 10h 10m (was: 10h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 10h 10m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86665=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86665 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 02/Apr/18 17:25 Start Date: 02/Apr/18 17:25 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178595803 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The (SideInputMap) value of a view when the tasks it blocks are unblocked + Otherwise, None. +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input view,
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86664=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86664 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 02/Apr/18 17:25 Start Date: 02/Apr/18 17:25 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178595922 ## File path: sdks/python/apache_beam/runners/direct/executor.py ## @@ -271,6 +272,14 @@ def __init__(self, transform_evaluator_registry, evaluation_context, self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle +# For non-empty bundles, store the window of the max EOW. +# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues +# if input_bundle._elements: Review comment: Please remove commented out code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86664) Time Spent: 9h 50m (was: 9h 40m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 9h 50m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86335=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86335 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 01/Apr/18 02:02 Start Date: 01/Apr/18 02:02 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178446105 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The (SideInputMap) value of a view when the tasks it blocks are unblocked + Otherwise, None. +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input view,
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86333=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86333 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 23:00 Start Date: 31/Mar/18 23:00 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178444045 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The (SideInputMap) value of a view when the tasks it blocks are unblocked + Otherwise, None. +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input view,
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86274=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86274 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 07:43 Start Date: 31/Mar/18 07:43 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178425977 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The (SideInputMap) value of a view when the tasks it blocks are unblocked + Otherwise, None. +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86275=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86275 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 07:43 Start Date: 31/Mar/18 07:43 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178425977 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The (SideInputMap) value of a view when the tasks it blocks are unblocked + Otherwise, None. +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86271=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86271 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 07:20 Start Date: 31/Mar/18 07:20 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178425447 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -138,7 +180,9 @@ def _pvalue_to_value(self, view, values): Raises: ValueError: If values cannot be converted into the requested form. """ -return sideinputs.SideInputMap(type(view), view._view_options(), values) +return sideinputs.SideInputMap(type(side_input), Review comment: That method was existing code. I simply changed, for consistency, the name of a variable that appears to be an _UnpickledSideInput in the uses I gave it (and currently given by the project). But perhaps its designer intended a more generic scope. I can't tell. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86271) Time Spent: 8h 50m (was: 8h 40m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 8h 50m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86272=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86272 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 07:20 Start Date: 31/Mar/18 07:20 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178425556 ## File path: sdks/python/apache_beam/runners/direct/executor.py ## @@ -271,6 +272,13 @@ def __init__(self, transform_evaluator_registry, evaluation_context, self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle +# For non-empty bundles, store the window of the max EOW. +# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues +if input_bundle._elements: Review comment: Indeed better. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86272) Time Spent: 9h (was: 8h 50m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 9h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86268=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86268 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 07:18 Start Date: 31/Mar/18 07:18 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178425519 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and Review comment: The first line of the docstring is a summary. The next too are a more detailed explanation. Then below I write the function's returns. However, I have changed the word 'return' from L90 to 'gets', so that it doesn't sound repetitive. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86268) Time Spent: 8h 20m (was: 8h 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 8h 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86270=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86270 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 07:18 Start Date: 31/Mar/18 07:18 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178425519 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and Review comment: The first line of the docstring is a summary. The next two are a more detailed explanation. Then below I write the function's returns. However, I have changed the word 'returns' from L90 to 'gets', so that it doesn't sound repetitive. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86270) Time Spent: 8h 40m (was: 8.5h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 8h 40m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86269 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 07:18 Start Date: 31/Mar/18 07:18 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178425519 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and Review comment: The first line of the docstring is a summary. The next two are a more detailed explanation. Then below I write the function's returns. However, I have changed the word 'return' from L90 to 'gets', so that it doesn't sound repetitive. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86269) Time Spent: 8.5h (was: 8h 20m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 8.5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86265=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86265 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 07:15 Start Date: 31/Mar/18 07:15 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178425454 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The (SideInputMap) value of a view when the tasks it blocks are unblocked + Otherwise, None. +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input view,
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86266=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86266 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 07:15 Start Date: 31/Mar/18 07:15 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178425458 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The (SideInputMap) value of a view when the tasks it blocks are unblocked Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86266) Time Spent: 8h 10m (was: 8h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 8h 10m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86264=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86264 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 07:15 Start Date: 31/Mar/18 07:15 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178425450 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The (SideInputMap) value of a view when the tasks it blocks are unblocked + Otherwise, None. +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input view,
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86263=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86263 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 07:14 Start Date: 31/Mar/18 07:14 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178425447 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -138,7 +180,9 @@ def _pvalue_to_value(self, view, values): Raises: ValueError: If values cannot be converted into the requested form. """ -return sideinputs.SideInputMap(type(view), view._view_options(), values) +return sideinputs.SideInputMap(type(side_input), Review comment: That method was existing code. I simply changed the names of a variable that appears to be an _UnpickledSideInput in the uses I gave it (and currently given by the project). But perhaps its designer intended a more generic scope. I can't tell. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86263) Time Spent: 7h 40m (was: 7.5h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 7h 40m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86226=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86226 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 00:19 Start Date: 31/Mar/18 00:19 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178409496 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The (SideInputMap) value of a view when the tasks it blocks are unblocked Review comment: You can use ``SideInputMap`` notation for these to properly render in the docs. (You do not need to do this in this PR. As a future reference.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86226) Time Spent: 6h 50m (was: 6h 40m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 6h 50m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86228=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86228 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 00:19 Start Date: 31/Mar/18 00:19 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178409693 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The (SideInputMap) value of a view when the tasks it blocks are unblocked + Otherwise, None. +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input view,
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86229=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86229 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 00:19 Start Date: 31/Mar/18 00:19 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178411799 ## File path: sdks/python/apache_beam/runners/direct/executor.py ## @@ -271,6 +272,13 @@ def __init__(self, transform_evaluator_registry, evaluation_context, self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle +# For non-empty bundles, store the window of the max EOW. +# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues +if input_bundle._elements: Review comment: `if input_bundle.has_elements():` ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86229) Time Spent: 7h 20m (was: 7h 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 7h 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86230=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86230 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 00:19 Start Date: 31/Mar/18 00:19 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178410118 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The (SideInputMap) value of a view when the tasks it blocks are unblocked + Otherwise, None. +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input view,
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86227=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86227 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 00:19 Start Date: 31/Mar/18 00:19 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178410646 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -138,7 +180,9 @@ def _pvalue_to_value(self, view, values): Raises: ValueError: If values cannot be converted into the requested form. """ -return sideinputs.SideInputMap(type(view), view._view_options(), values) +return sideinputs.SideInputMap(type(side_input), Review comment: args mentions that side_input is _UnpickledSideInput object. If that is known why do we need to do type(side_Input) here? Could the type be not _UnpickledSideInput? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86227) Time Spent: 7h (was: 6h 50m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 7h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86231=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86231 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 31/Mar/18 00:19 Start Date: 31/Mar/18 00:19 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178408890 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and Review comment: Why there are statements about what this method returns? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86231) Time Spent: 7.5h (was: 7h 20m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 7.5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86208=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86208 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 30/Mar/18 23:26 Start Date: 30/Mar/18 23:26 Worklog Time Spent: 10m Work Description: mariapython commented on issue #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#issuecomment-377645126 Thanks, Charles. @aaltay: Can you please merge after tests pass? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86208) Time Spent: 6h 40m (was: 6.5h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 6h 40m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86205=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86205 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 30/Mar/18 23:25 Start Date: 30/Mar/18 23:25 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178405654 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The value of a view when 'its' task is unblocked (otherwise, None). +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input view, returns the associated value in
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86206=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86206 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 30/Mar/18 23:25 Start Date: 30/Mar/18 23:25 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178405693 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The value of a view when 'its' task is unblocked (otherwise, None). Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86206) Time Spent: 6h 20m (was: 6h 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 6h 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86207=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86207 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 30/Mar/18 23:25 Start Date: 30/Mar/18 23:25 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178405734 ## File path: sdks/python/apache_beam/runners/direct/watermark_manager.py ## @@ -144,8 +144,10 @@ def _refresh_watermarks(self, applied_ptransform, side_inputs_container): for consumer in consumers: unblocked_tasks.extend( self._refresh_watermarks(consumer, side_inputs_container)) + # notify the side_inputs_container Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86207) Time Spent: 6.5h (was: 6h 20m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 6.5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86203=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86203 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 30/Mar/18 23:24 Start Date: 30/Mar/18 23:24 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178405564 ## File path: sdks/python/apache_beam/runners/direct/executor.py ## @@ -271,6 +272,12 @@ def __init__(self, transform_evaluator_registry, evaluation_context, self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle +# For non-empty bundles, store the window of the max EOW Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86203) Time Spent: 5h 50m (was: 5h 40m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 5h 50m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86204=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86204 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 30/Mar/18 23:24 Start Date: 30/Mar/18 23:24 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178405629 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The value of a view when 'its' task is unblocked (otherwise, None). +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input view, returns the associated value in
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86081=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86081 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 30/Mar/18 17:20 Start Date: 30/Mar/18 17:20 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178331816 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The value of a view when 'its' task is unblocked (otherwise, None). +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input view, returns the associated value
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86083=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86083 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 30/Mar/18 17:20 Start Date: 30/Mar/18 17:20 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178331820 ## File path: sdks/python/apache_beam/runners/direct/executor.py ## @@ -288,11 +295,16 @@ def call(self): scoped_metrics_container = ScopedMetricsContainer(metrics_container) for side_input in self._applied_ptransform.side_inputs: + # Find the projection of main's window onto the side input's window Review comment: Period at end of sentence. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86083) Time Spent: 5h 20m (was: 5h 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 5h 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86085=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86085 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 30/Mar/18 17:20 Start Date: 30/Mar/18 17:20 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178331814 ## File path: sdks/python/apache_beam/runners/direct/executor.py ## @@ -271,6 +272,12 @@ def __init__(self, transform_evaluator_registry, evaluation_context, self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle +# For non-empty bundles, store the window of the max EOW Review comment: Period at end of sentence. Can you add a TODO to modify the Bundle class to retrieve this more efficiently? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86085) Time Spent: 5h 40m (was: 5.5h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 5h 40m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86082=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86082 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 30/Mar/18 17:20 Start Date: 30/Mar/18 17:20 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178331817 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The value of a view when 'its' task is unblocked (otherwise, None). Review comment: Do you need quotes around "its"? Can you clarify that the return value is a `sideinputs.SideInputMap`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86082) Time Spent: 5h 20m (was: 5h 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 5h 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86086=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86086 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 30/Mar/18 17:20 Start Date: 30/Mar/18 17:20 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178331819 ## File path: sdks/python/apache_beam/runners/direct/watermark_manager.py ## @@ -144,8 +144,10 @@ def _refresh_watermarks(self, applied_ptransform, side_inputs_container): for consumer in consumers: unblocked_tasks.extend( self._refresh_watermarks(consumer, side_inputs_container)) + # notify the side_inputs_container Review comment: Please fix capitalization, period end of sentence. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 86086) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 5h 40m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86084=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86084 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 30/Mar/18 17:20 Start Date: 30/Mar/18 17:20 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#discussion_r178331815 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -69,67 +69,109 @@ class _SideInputsContainer(object): to a side input. """ - def __init__(self, views): + def __init__(self, side_inputs): self._lock = threading.Lock() self._views = {} -self._transform_to_views = collections.defaultdict(list) +self._transform_to_side_inputs = collections.defaultdict(list) +self._side_input_to_blocked_tasks = collections.defaultdict(list) -for view in views: - self._views[view] = _SideInputView(view) - self._transform_to_views[view.pvalue.producer].append(view) +for side in side_inputs: + self._views[side] = _SideInputView(side) + self._transform_to_side_inputs[side.pvalue.producer].append(side) def __repr__(self): views_string = (', '.join(str(elm) for elm in self._views.values()) if self._views.values() else '[]') return '_SideInputsContainer(_views=%s)' % views_string - def get_value_or_schedule_after_output(self, side_input, task): + def get_value_or_block_until_ready(self, side_input, task, block_until): +"""Returns the value of a view whose task is unblocked or blocks its task. + +It returns the value of a view whose watermark has been updated and +surpasses a given value. + +Args: + side_input: (_UnpickledSideInput) value. + task: (TransformExecutor) task waiting on a side input. + block_until: Timestamp after which the task gets unblocked. + +Returns: + The value of a view when 'its' task is unblocked (otherwise, None). +""" with self._lock: view = self._views[side_input] - if not view.has_result: -view.callable_queue.append(task) + if view.watermark and view.watermark.input_watermark >= block_until: +view.value = self._pvalue_to_value(side_input, view.elements) +return view.value + else: +view.blocked_tasks.append((task, block_until)) task.blocked = True - return (view.has_result, view.value) def add_values(self, side_input, values): with self._lock: view = self._views[side_input] - assert not view.has_result view.elements.extend(values) - def finalize_value_and_get_tasks(self, side_input): -with self._lock: - view = self._views[side_input] - assert not view.has_result - assert view.value is None - assert view.callable_queue is not None - view.value = self._pvalue_to_value(side_input, view.elements) - view.elements = None - result = tuple(view.callable_queue) - for task in result: -task.blocked = False - view.callable_queue = None - view.has_result = True - return result - - def update_watermarks_for_transform(self, ptransform, watermark): -# Collect tasks that get unblocked as the workflow progresses. -unblocked_tasks = [] -for view in self._transform_to_views[ptransform]: - unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) -return unblocked_tasks + def update_watermarks_for_transform_and_unblock_tasks(self, +ptransform, +watermark): +"""Updates _SideInputsContainer after a watermark update and unbloks tasks. + +It traverses the list of side inputs per PTransform and calls +_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks. + +Args: + ptransform: Value of a PTransform. + watermark: Value of the watermark after an update for a PTransform. - def _update_watermarks_for_view(self, view, watermark): +Returns: + Tasks that get unblocked as a result of the watermark advancing. +""" unblocked_tasks = [] -if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: - unblocked_tasks = self.finalize_value_and_get_tasks(view) +for side in self._transform_to_side_inputs[ptransform]: + unblocked_tasks.extend( + self._update_watermarks_for_side_input_and_unblock_tasks( + side, watermark)) return unblocked_tasks - def _pvalue_to_value(self, view, values): -"""Given a side input view, returns the associated value
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=84671=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-84671 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 27/Mar/18 02:29 Start Date: 27/Mar/18 02:29 Worklog Time Spent: 10m Work Description: mariapython commented on issue #4949: [BEAM-3818] WIP: Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949#issuecomment-376375686 R: @charlesccychen This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 84671) Time Spent: 5h (was: 4h 50m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=84332=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-84332 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 26/Mar/18 12:37 Start Date: 26/Mar/18 12:37 Worklog Time Spent: 10m Work Description: mariapython opened a new pull request #4949: [BEAM-3818] WIP: Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) URL: https://github.com/apache/beam/pull/4949 - [x] Unblock tasks as the _SideInputsContainer gets updated. - [ ] Add more tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 84332) Time Spent: 4h 50m (was: 4h 40m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 4h 50m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81917=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81917 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 19/Mar/18 16:51 Start Date: 19/Mar/18 16:51 Worklog Time Spent: 10m Work Description: aaltay closed pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 46176c9e969..d0ab55f5462 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -56,6 +56,11 @@ def __init__(self, view): self.value = None self.has_result = False + def __repr__(self): +elements_string = (', '.join(str(elm) for elm in self.elements) + if self.elements else '[]') +return '_SideInputView(elements=%s)' % elements_string + class _SideInputsContainer(object): """An in-process container for side inputs. @@ -67,8 +72,16 @@ class _SideInputsContainer(object): def __init__(self, views): self._lock = threading.Lock() self._views = {} +self._transform_to_views = collections.defaultdict(list) + for view in views: self._views[view] = _SideInputView(view) + self._transform_to_views[view.pvalue.producer].append(view) + + def __repr__(self): +views_string = (', '.join(str(elm) for elm in self._views.values()) +if self._views.values() else '[]') +return '_SideInputsContainer(_views=%s)' % views_string def get_value_or_schedule_after_output(self, side_input, task): with self._lock: @@ -99,6 +112,19 @@ def finalize_value_and_get_tasks(self, side_input): view.has_result = True return result + def update_watermarks_for_transform(self, ptransform, watermark): +# Collect tasks that get unblocked as the workflow progresses. +unblocked_tasks = [] +for view in self._transform_to_views[ptransform]: + unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) +return unblocked_tasks + + def _update_watermarks_for_view(self, view, watermark): +unblocked_tasks = [] +if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: + unblocked_tasks = self.finalize_value_and_get_tasks(view) +return unblocked_tasks + def _pvalue_to_value(self, view, values): """Given a side input view, returns the associated value in requested form. @@ -149,10 +175,10 @@ def __init__(self, pipeline_options, bundle_factory, root_transforms, self._pcollection_to_views[view.pvalue].append(view) self._transform_keyed_states = self._initialize_keyed_states( root_transforms, value_to_consumers) +self._side_inputs_container = _SideInputsContainer(views) self._watermark_manager = WatermarkManager( clock, root_transforms, value_to_consumers, self._transform_keyed_states) -self._side_inputs_container = _SideInputsContainer(views) self._pending_unblocked_tasks = [] self._counter_factory = counters.CounterFactory() self._metrics = DirectMetrics() @@ -199,9 +225,6 @@ def handle_result( committed_bundles, unprocessed_bundles = self._commit_bundles( result.uncommitted_output_bundles, result.unprocessed_bundles) - self._watermark_manager.update_watermarks( - completed_bundle, result.transform, completed_timers, - committed_bundles, unprocessed_bundles, result.keyed_watermark_holds) self._metrics.commit_logical(completed_bundle, result.logical_metric_updates) @@ -217,11 +240,13 @@ def handle_result( self._side_inputs_container.add_values( view, committed_bundle.get_elements_iterable(make_copy=True)) - if (self.get_execution_context(result.transform) - .watermarks.input_watermark - == WatermarkManager.WATERMARK_POS_INF): -self._pending_unblocked_tasks.extend( -self._side_inputs_container.finalize_value_and_get_tasks(view)) + + # Tasks generated from unblocked side inputs as the watermark progresses. + tasks = self._watermark_manager.update_watermarks( + completed_bundle, result.transform, completed_timers, + committed_bundles,
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81724=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81724 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 19/Mar/18 05:11 Start Date: 19/Mar/18 05:11 Worklog Time Spent: 10m Work Description: mariapython commented on issue #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#issuecomment-374102961 @lukecwik: I see. Is there a way to squash remotely, i.e. Jenkins would know that you are only putting all commits under one name? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81724) Time Spent: 4.5h (was: 4h 20m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 4.5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81697=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81697 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 18/Mar/18 22:31 Start Date: 18/Mar/18 22:31 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#issuecomment-374058795 @mariapython Anytime you make a change to the PR, the tests are rerun. Github/Jenkins doesn't know that your new single squashed commit is equivalent to your old commits. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81697) Time Spent: 4h 20m (was: 4h 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 4h 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81696=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81696 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 18/Mar/18 22:23 Start Date: 18/Mar/18 22:23 Worklog Time Spent: 10m Work Description: mariapython commented on issue #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#issuecomment-374057693 OK, I just squashed the commits and pushed. I am not sure why that triggered tests again... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81696) Time Spent: 4h 10m (was: 4h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 4h 10m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81685=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81685 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 18/Mar/18 20:07 Start Date: 18/Mar/18 20:07 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175301812 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -217,11 +242,13 @@ def handle_result( self._side_inputs_container.add_values( view, committed_bundle.get_elements_iterable(make_copy=True)) - if (self.get_execution_context(result.transform) - .watermarks.input_watermark - == WatermarkManager.WATERMARK_POS_INF): -self._pending_unblocked_tasks.extend( -self._side_inputs_container.finalize_value_and_get_tasks(view)) + + # Tasks generated from unblocked side inputs as the watermark progresses. + tasks = self._watermark_manager.update_watermarks( + completed_bundle, result.transform, completed_timers, + committed_bundles, unprocessed_bundles, result.keyed_watermark_holds, + self._side_inputs_container) + self._pending_unblocked_tasks.extend(tasks) Review comment: Makes sense. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81685) Time Spent: 4h (was: 3h 50m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 4h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81603=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81603 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 19:28 Start Date: 17/Mar/18 19:28 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175267711 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -217,11 +242,13 @@ def handle_result( self._side_inputs_container.add_values( view, committed_bundle.get_elements_iterable(make_copy=True)) - if (self.get_execution_context(result.transform) - .watermarks.input_watermark - == WatermarkManager.WATERMARK_POS_INF): -self._pending_unblocked_tasks.extend( -self._side_inputs_container.finalize_value_and_get_tasks(view)) + + # Tasks generated from unblocked side inputs as the watermark progresses. + tasks = self._watermark_manager.update_watermarks( + completed_bundle, result.transform, completed_timers, + committed_bundles, unprocessed_bundles, result.keyed_watermark_holds, + self._side_inputs_container) + self._pending_unblocked_tasks.extend(tasks) Review comment: I pushed all the changes earlier today and was going to say the same (the update needs to be regardless). That is why this was not modified in my last commit. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81603) Time Spent: 3h 40m (was: 3.5h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 3h 40m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81604=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81604 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 19:28 Start Date: 17/Mar/18 19:28 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175267611 ## File path: sdks/python/apache_beam/runners/direct/watermark_manager.py ## @@ -94,14 +95,14 @@ def get_watermarks(self, applied_ptransform): def update_watermarks(self, completed_committed_bundle, applied_ptransform, completed_timers, outputs, unprocessed_bundles, -keyed_earliest_holds): +keyed_earliest_holds, side_inputs_container): Review comment: I already pushed the change earlier, sorry I forgot to comment here. Actually what I don't need is the attribute, so I got rid of it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81604) Time Spent: 3h 50m (was: 3h 40m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 3h 50m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81602=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81602 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 19:25 Start Date: 17/Mar/18 19:25 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175267622 ## File path: sdks/python/apache_beam/testing/test_stream_test.py ## @@ -245,6 +247,84 @@ def fired_elements(elem): # TODO(BEAM-3377): Remove after assert_that in streaming is fixed. self.assertEqual([('k', ['a'])], result) + def test_basic_execution_sideinputs_batch(self): + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +global result # pylint: disable=global-variable-undefined +result = [] + +def recorded_elements(elem): + result.append(elem) + return elem + +options = PipelineOptions() +options.view_as(StandardOptions).streaming = True +p = TestPipeline(options=options) + +main_stream = (p + | 'main TestStream' >> TestStream() + .advance_watermark_to(10) + .add_elements(['e'])) +side = (p +| beam.Create([2, 1, 4]) +| beam.Map(lambda t: window.TimestampedValue(t, t))) + +class RecordFn(beam.DoFn): + def process(self, + elm=beam.DoFn.ElementParam, + ts=beam.DoFn.TimestampParam, + side=beam.DoFn.SideInputParam): +yield (elm, ts, side) + +records = (main_stream # pylint: disable=unused-variable + | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)) + | beam.Map(recorded_elements)) +p.run() + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result) + + def test_basic_execution_sideinputs(self): + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +global result # pylint: disable=global-variable-undefined +result = [] + +def recorded_elements(elem): + result.append(elem) + return elem + +options = PipelineOptions() +options.view_as(StandardOptions).streaming = True +p = TestPipeline(options=options) + +main_stream = (p + | 'main TestStream' >> TestStream() + .advance_watermark_to(10) + .add_elements(['e']) + .advance_processing_time(11)) +side_stream = (p + | 'side TestStream' >> TestStream() + .add_elements([window.TimestampedValue(2, 2)]) + .add_elements([window.TimestampedValue(1, 1)]) + .add_elements([window.TimestampedValue(4, 4)])) + +class RecordFn(beam.DoFn): + def process(self, + elm=beam.DoFn.ElementParam, + ts=beam.DoFn.TimestampParam, + side=beam.DoFn.SideInputParam): +yield (elm, ts, side) + +records = (main_stream# pylint: disable=unused-variable + | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream)) + | beam.Map(recorded_elements)) + +p.run() + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result) + Review comment: Will do This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81602) Time Spent: 3.5h (was: 3h 20m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 3.5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81600=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81600 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 19:25 Start Date: 17/Mar/18 19:25 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175267611 ## File path: sdks/python/apache_beam/runners/direct/watermark_manager.py ## @@ -94,14 +95,14 @@ def get_watermarks(self, applied_ptransform): def update_watermarks(self, completed_committed_bundle, applied_ptransform, completed_timers, outputs, unprocessed_bundles, -keyed_earliest_holds): +keyed_earliest_holds, side_inputs_container): Review comment: I already pushed the change earlier, sorry I forgot to comment here. Actually what I don't need is the attribute, since I got rid of it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81600) Time Spent: 3h 10m (was: 3h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 3h 10m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81601=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81601 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 19:25 Start Date: 17/Mar/18 19:25 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175267616 ## File path: sdks/python/apache_beam/runners/direct/watermark_manager.py ## @@ -128,8 +129,9 @@ def _update_pending(self, input_committed_bundle, applied_ptransform, if input_committed_bundle and input_committed_bundle.has_elements(): completed_tw.remove_pending(input_committed_bundle) - def _refresh_watermarks(self, applied_ptransform): + def _refresh_watermarks(self, applied_ptransform, side_inputs_container): Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81601) Time Spent: 3h 20m (was: 3h 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 3h 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81597=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81597 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 19:10 Start Date: 17/Mar/18 19:10 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175267215 ## File path: sdks/python/apache_beam/runners/direct/watermark_manager.py ## @@ -128,8 +129,9 @@ def _update_pending(self, input_committed_bundle, applied_ptransform, if input_committed_bundle and input_committed_bundle.has_elements(): completed_tw.remove_pending(input_committed_bundle) - def _refresh_watermarks(self, applied_ptransform): + def _refresh_watermarks(self, applied_ptransform, side_inputs_container): Review comment: You're right--Maria, can you fix this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81597) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 2h 50m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81598=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81598 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 19:10 Start Date: 17/Mar/18 19:10 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175267206 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -217,11 +242,13 @@ def handle_result( self._side_inputs_container.add_values( view, committed_bundle.get_elements_iterable(make_copy=True)) - if (self.get_execution_context(result.transform) - .watermarks.input_watermark - == WatermarkManager.WATERMARK_POS_INF): -self._pending_unblocked_tasks.extend( -self._side_inputs_container.finalize_value_and_get_tasks(view)) + + # Tasks generated from unblocked side inputs as the watermark progresses. + tasks = self._watermark_manager.update_watermarks( + completed_bundle, result.transform, completed_timers, + committed_bundles, unprocessed_bundles, result.keyed_watermark_holds, + self._side_inputs_container) + self._pending_unblocked_tasks.extend(tasks) Review comment: I think we need it to be more general than only if the current bundle is a view. Any watermark update may trigger downstream views to be updated--for example, say my windowing is Fixed(10), and I have a single element at time t=5. When the upstream watermark hits t=10, there may not be a bundle processed by the transform directly going into the view, so we do need this in the full generality, since the watermark update of the previous step could update the watermark of the transform emitting the view. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81598) Time Spent: 3h (was: 2h 50m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 3h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81596=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81596 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 19:10 Start Date: 17/Mar/18 19:10 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175267209 ## File path: sdks/python/apache_beam/runners/direct/watermark_manager.py ## @@ -94,14 +95,14 @@ def get_watermarks(self, applied_ptransform): def update_watermarks(self, completed_committed_bundle, applied_ptransform, completed_timers, outputs, unprocessed_bundles, -keyed_earliest_holds): +keyed_earliest_holds, side_inputs_container): Review comment: You're right--Maria, can you fix this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81596) Time Spent: 2h 50m (was: 2h 40m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 2h 50m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81483=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81483 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 02:26 Start Date: 17/Mar/18 02:26 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175245310 ## File path: sdks/python/apache_beam/testing/test_stream_test.py ## @@ -245,6 +247,84 @@ def fired_elements(elem): # TODO(BEAM-3377): Remove after assert_that in streaming is fixed. self.assertEqual([('k', ['a'])], result) + def test_basic_execution_sideinputs_batch(self): + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +global result # pylint: disable=global-variable-undefined +result = [] + +def recorded_elements(elem): + result.append(elem) + return elem + +options = PipelineOptions() +options.view_as(StandardOptions).streaming = True +p = TestPipeline(options=options) + +main_stream = (p + | 'main TestStream' >> TestStream() + .advance_watermark_to(10) + .add_elements(['e'])) +side = (p +| beam.Create([2, 1, 4]) +| beam.Map(lambda t: window.TimestampedValue(t, t))) + +class RecordFn(beam.DoFn): + def process(self, + elm=beam.DoFn.ElementParam, + ts=beam.DoFn.TimestampParam, + side=beam.DoFn.SideInputParam): +yield (elm, ts, side) + +records = (main_stream # pylint: disable=unused-variable + | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)) + | beam.Map(recorded_elements)) +p.run() + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result) + + def test_basic_execution_sideinputs(self): + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +global result # pylint: disable=global-variable-undefined +result = [] + +def recorded_elements(elem): + result.append(elem) + return elem + +options = PipelineOptions() +options.view_as(StandardOptions).streaming = True +p = TestPipeline(options=options) + +main_stream = (p + | 'main TestStream' >> TestStream() + .advance_watermark_to(10) + .add_elements(['e']) + .advance_processing_time(11)) +side_stream = (p + | 'side TestStream' >> TestStream() + .add_elements([window.TimestampedValue(2, 2)]) + .add_elements([window.TimestampedValue(1, 1)]) + .add_elements([window.TimestampedValue(4, 4)])) + +class RecordFn(beam.DoFn): + def process(self, + elm=beam.DoFn.ElementParam, + ts=beam.DoFn.TimestampParam, + side=beam.DoFn.SideInputParam): +yield (elm, ts, side) + +records = (main_stream# pylint: disable=unused-variable + | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream)) + | beam.Map(recorded_elements)) + +p.run() + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result) + Review comment: For the follow up PR, please add some non-trivial side input tests. (Suggestions: A test with partial side inputs in the accessible window, another with more main elements.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81483) Time Spent: 2h 20m (was: 2h 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 2h 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81485=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81485 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 02:26 Start Date: 17/Mar/18 02:26 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175244992 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -217,11 +242,13 @@ def handle_result( self._side_inputs_container.add_values( view, committed_bundle.get_elements_iterable(make_copy=True)) - if (self.get_execution_context(result.transform) - .watermarks.input_watermark - == WatermarkManager.WATERMARK_POS_INF): -self._pending_unblocked_tasks.extend( -self._side_inputs_container.finalize_value_and_get_tasks(view)) + + # Tasks generated from unblocked side inputs as the watermark progresses. + tasks = self._watermark_manager.update_watermarks( + completed_bundle, result.transform, completed_timers, + committed_bundles, unprocessed_bundles, result.keyed_watermark_holds, + self._side_inputs_container) + self._pending_unblocked_tasks.extend(tasks) Review comment: Should this stay inside the if condition? Since only when a view is updated, it will be necessary to check for unblocked tasks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81485) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81484=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81484 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 02:26 Start Date: 17/Mar/18 02:26 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175245107 ## File path: sdks/python/apache_beam/runners/direct/watermark_manager.py ## @@ -128,8 +129,9 @@ def _update_pending(self, input_committed_bundle, applied_ptransform, if input_committed_bundle and input_committed_bundle.has_elements(): completed_tw.remove_pending(input_committed_bundle) - def _refresh_watermarks(self, applied_ptransform): + def _refresh_watermarks(self, applied_ptransform, side_inputs_container): Review comment: Similar to the above comment. Can you use self._side_inputs_container instead of adding this new argument here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81484) Time Spent: 2.5h (was: 2h 20m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81487=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81487 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 02:26 Start Date: 17/Mar/18 02:26 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175245092 ## File path: sdks/python/apache_beam/runners/direct/watermark_manager.py ## @@ -94,14 +95,14 @@ def get_watermarks(self, applied_ptransform): def update_watermarks(self, completed_committed_bundle, applied_ptransform, completed_timers, outputs, unprocessed_bundles, -keyed_earliest_holds): +keyed_earliest_holds, side_inputs_container): Review comment: How is this new argument `side_inputs_container` different from the one passed to the constructor above? Can you use `self._side_inputs_container` instead of adding this new argument here? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81487) Time Spent: 2h 40m (was: 2.5h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 2h 40m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81486=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81486 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 02:26 Start Date: 17/Mar/18 02:26 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175244875 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -56,6 +56,12 @@ def __init__(self, view): self.value = None self.has_result = False + def __repr__(self): +elements_string = ( +', '.join( +str(elm) for elm in self.elements) if self.elements else '') Review comment: Why do we use `` here and `[]` below to represent empty list? (It is fine if this is fixed in a follow up PR.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81486) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81482=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81482 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 01:42 Start Date: 17/Mar/18 01:42 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838#discussion_r175244099 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -99,6 +113,19 @@ def finalize_value_and_get_tasks(self, side_input): view.has_result = True return result + def update_watermarks_for_transform(self, ptransform, watermark): +# Collect tasks that get unblocked as the workflow progresses +unblocked_tasks = [] +for view in self._transform_to_views[ptransform]: + unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) +return unblocked_tasks + + def _update_watermarks_for_view(self, view, watermark): +unblocked_tasks = [] +if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81482) Time Spent: 2h 10m (was: 2h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 2h 10m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81478=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81478 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 01:34 Start Date: 17/Mar/18 01:34 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r175243839 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -56,6 +56,11 @@ def __init__(self, view): self.value = None self.has_result = False + def __repr__(self): +elements = ', '.join([ +str(elm) for elm in self.elements] if self.elements else ['']) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81478) Time Spent: 1h 40m (was: 1.5h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81477=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81477 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 01:34 Start Date: 17/Mar/18 01:34 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r175243837 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -67,8 +72,17 @@ class _SideInputsContainer(object): def __init__(self, views): self._lock = threading.Lock() self._views = {} +self._transform_to_views = collections.defaultdict(list) + for view in views: self._views[view] = _SideInputView(view) + self._transform_to_views[view.pvalue.producer].append(view) + + def __repr__(self): +views = ', '.join([ +str(elm) for elm in self._views.values() +] if self._views.values() else []) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81477) Time Spent: 1.5h (was: 1h 20m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81480=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81480 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 01:34 Start Date: 17/Mar/18 01:34 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r175243849 ## File path: sdks/python/apache_beam/testing/test_stream_test.py ## @@ -245,6 +247,82 @@ def fired_elements(elem): # TODO(BEAM-3377): Remove after assert_that in streaming is fixed. self.assertEqual([('k', ['a'])], result) + def test_basic_execution_sideinputs_batch(self): + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +global result # pylint: disable=global-variable-undefined +result = [] + +def recorded_elements(elem): + result.append(elem) + return elem + +options = PipelineOptions() +options.view_as(StandardOptions).streaming = True +p = TestPipeline(options=options) + +main_stream = (p + | 'main TestStream' >> TestStream() + .advance_watermark_to(10) + .add_elements(['e'])) +side = (p +| beam.Create([2, 1, 4]) +| beam.Map(lambda t: window.TimestampedValue(t, t))) + +class RecordFn(beam.DoFn): + def process(self, + elm=beam.DoFn.ElementParam, + ts=beam.DoFn.TimestampParam, + side=beam.DoFn.SideInputParam): +yield (elm, ts, side) + +records = main_stream | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)) | beam.Map(recorded_elements) # pylint: disable=line-too-long, unused-variable Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81480) Time Spent: 2h (was: 1h 50m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 2h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81479=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81479 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 01:34 Start Date: 17/Mar/18 01:34 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r175243844 ## File path: sdks/python/apache_beam/testing/test_stream_test.py ## @@ -245,6 +247,82 @@ def fired_elements(elem): # TODO(BEAM-3377): Remove after assert_that in streaming is fixed. self.assertEqual([('k', ['a'])], result) + def test_basic_execution_sideinputs_batch(self): + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +global result # pylint: disable=global-variable-undefined +result = [] + +def recorded_elements(elem): + result.append(elem) + return elem + +options = PipelineOptions() +options.view_as(StandardOptions).streaming = True +p = TestPipeline(options=options) + +main_stream = (p + | 'main TestStream' >> TestStream() + .advance_watermark_to(10) + .add_elements(['e'])) +side = (p +| beam.Create([2, 1, 4]) +| beam.Map(lambda t: window.TimestampedValue(t, t))) + +class RecordFn(beam.DoFn): + def process(self, + elm=beam.DoFn.ElementParam, + ts=beam.DoFn.TimestampParam, + side=beam.DoFn.SideInputParam): +yield (elm, ts, side) + +records = main_stream | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)) | beam.Map(recorded_elements) # pylint: disable=line-too-long, unused-variable +p.run() + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result) + + def test_basic_execution_sideinputs(self): + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +global result # pylint: disable=global-variable-undefined +result = [] + +def recorded_elements(elem): + result.append(elem) + return elem + +options = PipelineOptions() +options.view_as(StandardOptions).streaming = True +p = TestPipeline(options=options) + +main_stream = (p + | 'main TestStream' >> TestStream() + .advance_watermark_to(10) + .add_elements(['e']) + .advance_processing_time(11)) +# TODO(mariagh): Fix this Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81479) Time Spent: 1h 50m (was: 1h 40m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 1h 50m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81475=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81475 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 01:33 Start Date: 17/Mar/18 01:33 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r175243829 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -217,11 +241,12 @@ def handle_result( self._side_inputs_container.add_values( view, committed_bundle.get_elements_iterable(make_copy=True)) - if (self.get_execution_context(result.transform) - .watermarks.input_watermark - == WatermarkManager.WATERMARK_POS_INF): -self._pending_unblocked_tasks.extend( -self._side_inputs_container.finalize_value_and_get_tasks(view)) + + tasks = self._watermark_manager.update_watermarks( Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81475) Time Spent: 1h 10m (was: 1h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 1h 10m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81476=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81476 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 01:34 Start Date: 17/Mar/18 01:34 Worklog Time Spent: 10m Work Description: mariapython commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r175243831 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -99,6 +113,19 @@ def finalize_value_and_get_tasks(self, side_input): view.has_result = True return result + def update_watermarks_for_transform(self, ptransform, watermark): +# Collect tasks that get unblocked as the workflow progresses Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81476) Time Spent: 1h 20m (was: 1h 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81452=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81452 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 17/Mar/18 00:13 Start Date: 17/Mar/18 00:13 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r175239845 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -99,6 +113,19 @@ def finalize_value_and_get_tasks(self, side_input): view.has_result = True return result + def update_watermarks_for_transform(self, ptransform, watermark): +# Collect tasks that get unblocked as the workflow progresses +unblocked_tasks = [] +for view in self._transform_to_views[ptransform]: + unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) +return unblocked_tasks + + def _update_watermarks_for_view(self, view, watermark): +unblocked_tasks = [] +if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: Review comment: Thank you. Sounds good, let's do that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81452) Time Spent: 1h (was: 50m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 1h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81413=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81413 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 16/Mar/18 22:33 Start Date: 16/Mar/18 22:33 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r175229449 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -99,6 +113,19 @@ def finalize_value_and_get_tasks(self, side_input): view.has_result = True return result + def update_watermarks_for_transform(self, ptransform, watermark): +# Collect tasks that get unblocked as the workflow progresses +unblocked_tasks = [] +for view in self._transform_to_views[ptransform]: + unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) +return unblocked_tasks + + def _update_watermarks_for_view(self, view, watermark): +unblocked_tasks = [] +if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: Review comment: Yes, this is part one of a refactor that will deliver the streaming side input functionality. I've synced with Maria, and she has the other parts of the refactor after this one goes in. Perhaps we should clarify the title of the PR though. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81413) Time Spent: 50m (was: 40m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 50m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81410=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81410 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 16/Mar/18 22:31 Start Date: 16/Mar/18 22:31 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r175229093 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -99,6 +113,19 @@ def finalize_value_and_get_tasks(self, side_input): view.has_result = True return result + def update_watermarks_for_transform(self, ptransform, watermark): +# Collect tasks that get unblocked as the workflow progresses +unblocked_tasks = [] +for view in self._transform_to_views[ptransform]: + unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) +return unblocked_tasks + + def _update_watermarks_for_view(self, view, watermark): +unblocked_tasks = [] +if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: Review comment: Would not waiting for `WatermarkManager.WATERMARK_POS_INF` unblock side inputs only when they are completely finished? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81410) Time Spent: 40m (was: 0.5h) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 40m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=80995=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80995 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 15/Mar/18 22:50 Start Date: 15/Mar/18 22:50 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r174956108 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -217,11 +241,12 @@ def handle_result( self._side_inputs_container.add_values( view, committed_bundle.get_elements_iterable(make_copy=True)) - if (self.get_execution_context(result.transform) - .watermarks.input_watermark - == WatermarkManager.WATERMARK_POS_INF): -self._pending_unblocked_tasks.extend( -self._side_inputs_container.finalize_value_and_get_tasks(view)) + + tasks = self._watermark_manager.update_watermarks( Review comment: Please add comment that these tasks come from unblocked side inputs. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 80995) Time Spent: 20m (was: 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=80996=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80996 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 15/Mar/18 22:50 Start Date: 15/Mar/18 22:50 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r174956111 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -56,6 +56,11 @@ def __init__(self, view): self.value = None self.has_result = False + def __repr__(self): +elements = ', '.join([ +str(elm) for elm in self.elements] if self.elements else ['']) Review comment: Can you do the following? ``` elements_string = ', '.join(str(elm) for elm in self.elements) if self.elements else '' ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 80996) Time Spent: 20m (was: 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=80998=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80998 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 15/Mar/18 22:50 Start Date: 15/Mar/18 22:50 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r174956113 ## File path: sdks/python/apache_beam/testing/test_stream_test.py ## @@ -245,6 +247,82 @@ def fired_elements(elem): # TODO(BEAM-3377): Remove after assert_that in streaming is fixed. self.assertEqual([('k', ['a'])], result) + def test_basic_execution_sideinputs_batch(self): + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +global result # pylint: disable=global-variable-undefined +result = [] + +def recorded_elements(elem): + result.append(elem) + return elem + +options = PipelineOptions() +options.view_as(StandardOptions).streaming = True +p = TestPipeline(options=options) + +main_stream = (p + | 'main TestStream' >> TestStream() + .advance_watermark_to(10) + .add_elements(['e'])) +side = (p +| beam.Create([2, 1, 4]) +| beam.Map(lambda t: window.TimestampedValue(t, t))) + +class RecordFn(beam.DoFn): + def process(self, + elm=beam.DoFn.ElementParam, + ts=beam.DoFn.TimestampParam, + side=beam.DoFn.SideInputParam): +yield (elm, ts, side) + +records = main_stream | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)) | beam.Map(recorded_elements) # pylint: disable=line-too-long, unused-variable +p.run() + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result) + + def test_basic_execution_sideinputs(self): + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +global result # pylint: disable=global-variable-undefined +result = [] + +def recorded_elements(elem): + result.append(elem) + return elem + +options = PipelineOptions() +options.view_as(StandardOptions).streaming = True +p = TestPipeline(options=options) + +main_stream = (p + | 'main TestStream' >> TestStream() + .advance_watermark_to(10) + .add_elements(['e']) + .advance_processing_time(11)) +# TODO(mariagh): Fix this Review comment: Please make this comment more detailed, or remove. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 80998) Time Spent: 20m (was: 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81000=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81000 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 15/Mar/18 22:50 Start Date: 15/Mar/18 22:50 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r174956109 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -99,6 +113,19 @@ def finalize_value_and_get_tasks(self, side_input): view.has_result = True return result + def update_watermarks_for_transform(self, ptransform, watermark): +# Collect tasks that get unblocked as the workflow progresses Review comment: nit: period at end of sentence. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 81000) Time Spent: 0.5h (was: 20m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=80997=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80997 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 15/Mar/18 22:50 Start Date: 15/Mar/18 22:50 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r174956110 ## File path: sdks/python/apache_beam/runners/direct/evaluation_context.py ## @@ -67,8 +72,17 @@ class _SideInputsContainer(object): def __init__(self, views): self._lock = threading.Lock() self._views = {} +self._transform_to_views = collections.defaultdict(list) + for view in views: self._views[view] = _SideInputView(view) + self._transform_to_views[view.pvalue.producer].append(view) + + def __repr__(self): +views = ', '.join([ +str(elm) for elm in self._views.values() +] if self._views.values() else []) Review comment: Same as above for `_SideInputView.__repr__`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 80997) Time Spent: 20m (was: 10m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=80999=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80999 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 15/Mar/18 22:50 Start Date: 15/Mar/18 22:50 Worklog Time Spent: 10m Work Description: charlesccychen commented on a change in pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#discussion_r174956114 ## File path: sdks/python/apache_beam/testing/test_stream_test.py ## @@ -245,6 +247,82 @@ def fired_elements(elem): # TODO(BEAM-3377): Remove after assert_that in streaming is fixed. self.assertEqual([('k', ['a'])], result) + def test_basic_execution_sideinputs_batch(self): + +# TODO(BEAM-3377): Remove after assert_that in streaming is fixed. +global result # pylint: disable=global-variable-undefined +result = [] + +def recorded_elements(elem): + result.append(elem) + return elem + +options = PipelineOptions() +options.view_as(StandardOptions).streaming = True +p = TestPipeline(options=options) + +main_stream = (p + | 'main TestStream' >> TestStream() + .advance_watermark_to(10) + .add_elements(['e'])) +side = (p +| beam.Create([2, 1, 4]) +| beam.Map(lambda t: window.TimestampedValue(t, t))) + +class RecordFn(beam.DoFn): + def process(self, + elm=beam.DoFn.ElementParam, + ts=beam.DoFn.TimestampParam, + side=beam.DoFn.SideInputParam): +yield (elm, ts, side) + +records = main_stream | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)) | beam.Map(recorded_elements) # pylint: disable=line-too-long, unused-variable Review comment: Can you use the multi-line form to avoid the line-too-long? You can also just avoid creating the `records` variable. ``` (main_stream | beam.ParDo(RecordFn, ...) | beam.Map(...)) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 80999) Time Spent: 0.5h (was: 20m) > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=79147=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79147 ] ASF GitHub Bot logged work on BEAM-3818: Author: ASF GitHub Bot Created on: 10/Mar/18 00:39 Start Date: 10/Mar/18 00:39 Worklog Time Spent: 10m Work Description: mariapython commented on issue #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner URL: https://github.com/apache/beam/pull/4838#issuecomment-371984800 R: @charlesccychen cc: @aaltay, @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 79147) Time Spent: 10m Remaining Estimate: 0h > Add support for the streaming side inputs in the Python DirectRunner > > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: María GH >Assignee: María GH >Priority: Minor > Fix For: 3.0.0 > > Time Spent: 10m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)