[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86706=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86706
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 02/Apr/18 19:08
Start Date: 02/Apr/18 19:08
Worklog Time Spent: 10m 
  Work Description: aaltay closed pull request #4949: [BEAM-3818] Add 
support for streaming side inputs in the DirectRunner (part 2: unblock tasks as 
the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py 
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index d0ab55f5462..893e32e3357 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -51,10 +51,10 @@ class _SideInputView(object):
 
   def __init__(self, view):
 self._view = view
-self.callable_queue = collections.deque()
+self.blocked_tasks = collections.deque()
 self.elements = []
 self.value = None
-self.has_result = False
+self.watermark = None
 
   def __repr__(self):
 elements_string = (', '.join(str(elm) for elm in self.elements)
@@ -69,67 +69,108 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It gets the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: ``_UnpickledSideInput`` value.
+  task: ``TransformExecutor`` task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The ``SideInputMap`` value of a view when the tasks it blocks are
+  unblocked. Otherwise, None.
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.output_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86694=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86694
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 02/Apr/18 18:29
Start Date: 02/Apr/18 18:29
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178612873
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/executor.py
 ##
 @@ -271,6 +272,14 @@ def __init__(self, transform_evaluator_registry, 
evaluation_context,
 self._transform_evaluator_registry = transform_evaluator_registry
 self._evaluation_context = evaluation_context
 self._input_bundle = input_bundle
+# For non-empty bundles, store the window of the max EOW.
+# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues
+# if input_bundle._elements:
 
 Review comment:
   Done!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86694)
Time Spent: 11h  (was: 10h 50m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 11h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86693=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86693
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 02/Apr/18 18:29
Start Date: 02/Apr/18 18:29
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178611376
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/executor.py
 ##
 @@ -271,6 +272,14 @@ def __init__(self, transform_evaluator_registry, 
evaluation_context,
 self._transform_evaluator_registry = transform_evaluator_registry
 self._evaluation_context = evaluation_context
 self._input_bundle = input_bundle
+# For non-empty bundles, store the window of the max EOW.
+# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues
+# if input_bundle._elements:
 
 Review comment:
   I got it, the if. Done!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86693)
Time Spent: 10h 50m  (was: 10h 40m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 10h 50m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86692=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86692
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 02/Apr/18 18:29
Start Date: 02/Apr/18 18:29
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178610974
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/executor.py
 ##
 @@ -271,6 +272,14 @@ def __init__(self, transform_evaluator_registry, 
evaluation_context,
 self._transform_evaluator_registry = transform_evaluator_registry
 self._evaluation_context = evaluation_context
 self._input_bundle = input_bundle
+# For non-empty bundles, store the window of the max EOW.
+# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues
+# if input_bundle._elements:
 
 Review comment:
   Both parts are needed.
   `# For non-empty bundles, store the window of the max EOW.` was part of the 
initial PR.
   `# TODO(mariagh): Move...` was added as a review request.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86692)
Time Spent: 10h 40m  (was: 10.5h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 10h 40m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86690=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86690
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 02/Apr/18 18:23
Start Date: 02/Apr/18 18:23
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178611376
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/executor.py
 ##
 @@ -271,6 +272,14 @@ def __init__(self, transform_evaluator_registry, 
evaluation_context,
 self._transform_evaluator_registry = transform_evaluator_registry
 self._evaluation_context = evaluation_context
 self._input_bundle = input_bundle
+# For non-empty bundles, store the window of the max EOW.
+# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues
+# if input_bundle._elements:
 
 Review comment:
   I got it, the if. Done!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86690)
Time Spent: 10.5h  (was: 10h 20m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86689=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86689
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 02/Apr/18 18:22
Start Date: 02/Apr/18 18:22
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178611141
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The (SideInputMap) value of a view when the tasks it blocks are unblocked
+  Otherwise, None.
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input view, 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86688=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86688
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 02/Apr/18 18:21
Start Date: 02/Apr/18 18:21
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178610974
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/executor.py
 ##
 @@ -271,6 +272,14 @@ def __init__(self, transform_evaluator_registry, 
evaluation_context,
 self._transform_evaluator_registry = transform_evaluator_registry
 self._evaluation_context = evaluation_context
 self._input_bundle = input_bundle
+# For non-empty bundles, store the window of the max EOW.
+# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues
+# if input_bundle._elements:
 
 Review comment:
   Both parts are needed.
   `# For non-empty bundles, store the window of the max EOW.` was part of the 
initial PR.
   `# TODO(mariagh): Move...` was added as a review request.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86688)
Time Spent: 10h 10m  (was: 10h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86665=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86665
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 02/Apr/18 17:25
Start Date: 02/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4949: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: 
unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178595803
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The (SideInputMap) value of a view when the tasks it blocks are unblocked
+  Otherwise, None.
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input view, 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-04-02 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86664=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86664
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 02/Apr/18 17:25
Start Date: 02/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4949: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: 
unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178595922
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/executor.py
 ##
 @@ -271,6 +272,14 @@ def __init__(self, transform_evaluator_registry, 
evaluation_context,
 self._transform_evaluator_registry = transform_evaluator_registry
 self._evaluation_context = evaluation_context
 self._input_bundle = input_bundle
+# For non-empty bundles, store the window of the max EOW.
+# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues
+# if input_bundle._elements:
 
 Review comment:
   Please remove commented out code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86664)
Time Spent: 9h 50m  (was: 9h 40m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86335=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86335
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 01/Apr/18 02:02
Start Date: 01/Apr/18 02:02
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178446105
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The (SideInputMap) value of a view when the tasks it blocks are unblocked
+  Otherwise, None.
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input view, 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86333=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86333
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 23:00
Start Date: 31/Mar/18 23:00
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178444045
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The (SideInputMap) value of a view when the tasks it blocks are unblocked
+  Otherwise, None.
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input view, 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86274=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86274
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 07:43
Start Date: 31/Mar/18 07:43
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178425977
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The (SideInputMap) value of a view when the tasks it blocks are unblocked
+  Otherwise, None.
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86275=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86275
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 07:43
Start Date: 31/Mar/18 07:43
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178425977
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The (SideInputMap) value of a view when the tasks it blocks are unblocked
+  Otherwise, None.
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86271=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86271
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 07:20
Start Date: 31/Mar/18 07:20
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178425447
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -138,7 +180,9 @@ def _pvalue_to_value(self, view, values):
 Raises:
   ValueError: If values cannot be converted into the requested form.
 """
-return sideinputs.SideInputMap(type(view), view._view_options(), values)
+return sideinputs.SideInputMap(type(side_input),
 
 Review comment:
   That method was existing code. I simply changed, for consistency, the name 
of a variable that appears to be an _UnpickledSideInput in the uses I gave it 
(and currently given by the project). But perhaps its designer intended a more 
generic scope. I can't tell.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86271)
Time Spent: 8h 50m  (was: 8h 40m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86272=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86272
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 07:20
Start Date: 31/Mar/18 07:20
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178425556
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/executor.py
 ##
 @@ -271,6 +272,13 @@ def __init__(self, transform_evaluator_registry, 
evaluation_context,
 self._transform_evaluator_registry = transform_evaluator_registry
 self._evaluation_context = evaluation_context
 self._input_bundle = input_bundle
+# For non-empty bundles, store the window of the max EOW.
+# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues
+if input_bundle._elements:
 
 Review comment:
   Indeed better. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86272)
Time Spent: 9h  (was: 8h 50m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86268=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86268
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 07:18
Start Date: 31/Mar/18 07:18
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178425519
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
 
 Review comment:
   The first line of the docstring is a summary. The next too are a more 
detailed explanation. Then below I write the function's returns. However, I 
have changed the word 'return' from L90 to 'gets', so that it doesn't sound 
repetitive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86268)
Time Spent: 8h 20m  (was: 8h 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86270=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86270
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 07:18
Start Date: 31/Mar/18 07:18
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178425519
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
 
 Review comment:
   The first line of the docstring is a summary. The next two are a more 
detailed explanation. Then below I write the function's returns. However, I 
have changed the word 'returns' from L90 to 'gets', so that it doesn't sound 
repetitive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86270)
Time Spent: 8h 40m  (was: 8.5h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86269
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 07:18
Start Date: 31/Mar/18 07:18
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178425519
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
 
 Review comment:
   The first line of the docstring is a summary. The next two are a more 
detailed explanation. Then below I write the function's returns. However, I 
have changed the word 'return' from L90 to 'gets', so that it doesn't sound 
repetitive.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86269)
Time Spent: 8.5h  (was: 8h 20m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86265=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86265
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 07:15
Start Date: 31/Mar/18 07:15
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178425454
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The (SideInputMap) value of a view when the tasks it blocks are unblocked
+  Otherwise, None.
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input view, 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86266=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86266
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 07:15
Start Date: 31/Mar/18 07:15
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178425458
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The (SideInputMap) value of a view when the tasks it blocks are unblocked
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86266)
Time Spent: 8h 10m  (was: 8h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86264=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86264
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 07:15
Start Date: 31/Mar/18 07:15
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178425450
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The (SideInputMap) value of a view when the tasks it blocks are unblocked
+  Otherwise, None.
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input view, 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-31 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86263=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86263
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 07:14
Start Date: 31/Mar/18 07:14
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178425447
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -138,7 +180,9 @@ def _pvalue_to_value(self, view, values):
 Raises:
   ValueError: If values cannot be converted into the requested form.
 """
-return sideinputs.SideInputMap(type(view), view._view_options(), values)
+return sideinputs.SideInputMap(type(side_input),
 
 Review comment:
   That method was existing code. I simply changed the names of a variable that 
appears to be an _UnpickledSideInput in the uses I gave it (and currently given 
by the project). But perhaps its designer intended a more generic scope. I 
can't tell.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86263)
Time Spent: 7h 40m  (was: 7.5h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86226=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86226
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 00:19
Start Date: 31/Mar/18 00:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4949: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: 
unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178409496
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The (SideInputMap) value of a view when the tasks it blocks are unblocked
 
 Review comment:
   You can use ``SideInputMap`` notation for these to properly render in the 
docs. (You do not need to do this in this PR. As a future reference.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86226)
Time Spent: 6h 50m  (was: 6h 40m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86228=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86228
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 00:19
Start Date: 31/Mar/18 00:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4949: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: 
unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178409693
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The (SideInputMap) value of a view when the tasks it blocks are unblocked
+  Otherwise, None.
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input view, 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86229=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86229
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 00:19
Start Date: 31/Mar/18 00:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4949: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: 
unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178411799
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/executor.py
 ##
 @@ -271,6 +272,13 @@ def __init__(self, transform_evaluator_registry, 
evaluation_context,
 self._transform_evaluator_registry = transform_evaluator_registry
 self._evaluation_context = evaluation_context
 self._input_bundle = input_bundle
+# For non-empty bundles, store the window of the max EOW.
+# TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues
+if input_bundle._elements:
 
 Review comment:
   `if input_bundle.has_elements():` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86229)
Time Spent: 7h 20m  (was: 7h 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86230=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86230
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 00:19
Start Date: 31/Mar/18 00:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4949: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: 
unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178410118
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The (SideInputMap) value of a view when the tasks it blocks are unblocked
+  Otherwise, None.
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input view, 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86227=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86227
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 00:19
Start Date: 31/Mar/18 00:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4949: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: 
unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178410646
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -138,7 +180,9 @@ def _pvalue_to_value(self, view, values):
 Raises:
   ValueError: If values cannot be converted into the requested form.
 """
-return sideinputs.SideInputMap(type(view), view._view_options(), values)
+return sideinputs.SideInputMap(type(side_input),
 
 Review comment:
   args mentions that side_input is _UnpickledSideInput object. If that is 
known why do we need to do type(side_Input) here? Could the type be not 
_UnpickledSideInput?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86227)
Time Spent: 7h  (was: 6h 50m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86231=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86231
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 31/Mar/18 00:19
Start Date: 31/Mar/18 00:19
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4949: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: 
unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178408890
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
 
 Review comment:
   Why there are statements about what this method returns?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86231)
Time Spent: 7.5h  (was: 7h 20m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86208=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86208
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 30/Mar/18 23:26
Start Date: 30/Mar/18 23:26
Worklog Time Spent: 10m 
  Work Description: mariapython commented on issue #4949: [BEAM-3818] Add 
support for streaming side inputs in the DirectRunner (part 2: unblock tasks as 
the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#issuecomment-377645126
 
 
   Thanks, Charles.
   @aaltay: Can you please merge after tests pass?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86208)
Time Spent: 6h 40m  (was: 6.5h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86205=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86205
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 30/Mar/18 23:25
Start Date: 30/Mar/18 23:25
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178405654
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The value of a view when 'its' task is unblocked (otherwise, None).
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input view, returns the associated value in 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86206=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86206
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 30/Mar/18 23:25
Start Date: 30/Mar/18 23:25
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178405693
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The value of a view when 'its' task is unblocked (otherwise, None).
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86206)
Time Spent: 6h 20m  (was: 6h 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86207=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86207
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 30/Mar/18 23:25
Start Date: 30/Mar/18 23:25
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178405734
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/watermark_manager.py
 ##
 @@ -144,8 +144,10 @@ def _refresh_watermarks(self, applied_ptransform, 
side_inputs_container):
 for consumer in consumers:
   unblocked_tasks.extend(
   self._refresh_watermarks(consumer, side_inputs_container))
+  # notify the side_inputs_container
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86207)
Time Spent: 6.5h  (was: 6h 20m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86203=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86203
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 30/Mar/18 23:24
Start Date: 30/Mar/18 23:24
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178405564
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/executor.py
 ##
 @@ -271,6 +272,12 @@ def __init__(self, transform_evaluator_registry, 
evaluation_context,
 self._transform_evaluator_registry = transform_evaluator_registry
 self._evaluation_context = evaluation_context
 self._input_bundle = input_bundle
+# For non-empty bundles, store the window of the max EOW
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86203)
Time Spent: 5h 50m  (was: 5h 40m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86204=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86204
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 30/Mar/18 23:24
Start Date: 30/Mar/18 23:24
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178405629
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The value of a view when 'its' task is unblocked (otherwise, None).
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input view, returns the associated value in 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86081=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86081
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 30/Mar/18 17:20
Start Date: 30/Mar/18 17:20
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178331816
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The value of a view when 'its' task is unblocked (otherwise, None).
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input view, returns the associated value 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86083=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86083
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 30/Mar/18 17:20
Start Date: 30/Mar/18 17:20
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178331820
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/executor.py
 ##
 @@ -288,11 +295,16 @@ def call(self):
 scoped_metrics_container = ScopedMetricsContainer(metrics_container)
 
 for side_input in self._applied_ptransform.side_inputs:
+  # Find the projection of main's window onto the side input's window
 
 Review comment:
   
   Period at end of sentence.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86083)
Time Spent: 5h 20m  (was: 5h 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86085=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86085
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 30/Mar/18 17:20
Start Date: 30/Mar/18 17:20
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178331814
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/executor.py
 ##
 @@ -271,6 +272,12 @@ def __init__(self, transform_evaluator_registry, 
evaluation_context,
 self._transform_evaluator_registry = transform_evaluator_registry
 self._evaluation_context = evaluation_context
 self._input_bundle = input_bundle
+# For non-empty bundles, store the window of the max EOW
 
 Review comment:
   
   Period at end of sentence.
   
   Can you add a TODO to modify the Bundle class to retrieve this more 
efficiently?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86085)
Time Spent: 5h 40m  (was: 5.5h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86082=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86082
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 30/Mar/18 17:20
Start Date: 30/Mar/18 17:20
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178331817
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The value of a view when 'its' task is unblocked (otherwise, None).
 
 Review comment:
   
   Do you need quotes around "its"?  Can you clarify that the return value is a 
`sideinputs.SideInputMap`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86082)
Time Spent: 5h 20m  (was: 5h 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86086=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86086
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 30/Mar/18 17:20
Start Date: 30/Mar/18 17:20
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178331819
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/watermark_manager.py
 ##
 @@ -144,8 +144,10 @@ def _refresh_watermarks(self, applied_ptransform, 
side_inputs_container):
 for consumer in consumers:
   unblocked_tasks.extend(
   self._refresh_watermarks(consumer, side_inputs_container))
+  # notify the side_inputs_container
 
 Review comment:
   
   Please fix capitalization, period end of sentence.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 86086)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-30 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86084=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86084
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 30/Mar/18 17:20
Start Date: 30/Mar/18 17:20
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178331815
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
 self._lock = threading.Lock()
 self._views = {}
-self._transform_to_views = collections.defaultdict(list)
+self._transform_to_side_inputs = collections.defaultdict(list)
+self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-for view in views:
-  self._views[view] = _SideInputView(view)
-  self._transform_to_views[view.pvalue.producer].append(view)
+for side in side_inputs:
+  self._views[side] = _SideInputView(side)
+  self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
 views_string = (', '.join(str(elm) for elm in self._views.values())
 if self._views.values() else '[]')
 return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+"""Returns the value of a view whose task is unblocked or blocks its task.
+
+It returns the value of a view whose watermark has been updated and
+surpasses a given value.
+
+Args:
+  side_input: (_UnpickledSideInput) value.
+  task: (TransformExecutor) task waiting on a side input.
+  block_until: Timestamp after which the task gets unblocked.
+
+Returns:
+  The value of a view when 'its' task is unblocked (otherwise, None).
+"""
 with self._lock:
   view = self._views[side_input]
-  if not view.has_result:
-view.callable_queue.append(task)
+  if view.watermark and view.watermark.input_watermark >= block_until:
+view.value = self._pvalue_to_value(side_input, view.elements)
+return view.value
+  else:
+view.blocked_tasks.append((task, block_until))
 task.blocked = True
-  return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
 with self._lock:
   view = self._views[side_input]
-  assert not view.has_result
   view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-with self._lock:
-  view = self._views[side_input]
-  assert not view.has_result
-  assert view.value is None
-  assert view.callable_queue is not None
-  view.value = self._pvalue_to_value(side_input, view.elements)
-  view.elements = None
-  result = tuple(view.callable_queue)
-  for task in result:
-task.blocked = False
-  view.callable_queue = None
-  view.has_result = True
-  return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-# Collect tasks that get unblocked as the workflow progresses.
-unblocked_tasks = []
-for view in self._transform_to_views[ptransform]:
-  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+ptransform,
+watermark):
+"""Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+It traverses the list of side inputs per PTransform and calls
+_update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+Args:
+  ptransform: Value of a PTransform.
+  watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+Returns:
+  Tasks that get unblocked as a result of the watermark advancing.
+"""
 unblocked_tasks = []
-if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+for side in self._transform_to_side_inputs[ptransform]:
+  unblocked_tasks.extend(
+  self._update_watermarks_for_side_input_and_unblock_tasks(
+  side, watermark))
 return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-"""Given a side input view, returns the associated value 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=84671=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-84671
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 27/Mar/18 02:29
Start Date: 27/Mar/18 02:29
Worklog Time Spent: 10m 
  Work Description: mariapython commented on issue #4949: [BEAM-3818] WIP: 
Add support for streaming side inputs in the DirectRunner (part 2: unblock 
tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#issuecomment-376375686
 
 
   R: @charlesccychen 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 84671)
Time Spent: 5h  (was: 4h 50m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-26 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=84332=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-84332
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 26/Mar/18 12:37
Start Date: 26/Mar/18 12:37
Worklog Time Spent: 10m 
  Work Description: mariapython opened a new pull request #4949: 
[BEAM-3818] WIP: Add support for streaming side inputs in the DirectRunner 
(part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949
 
 
- [x] Unblock tasks as the _SideInputsContainer gets updated.
- [ ] Add more tests.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 84332)
Time Spent: 4h 50m  (was: 4h 40m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-19 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81917=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81917
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 19/Mar/18 16:51
Start Date: 19/Mar/18 16:51
Worklog Time Spent: 10m 
  Work Description: aaltay closed pull request #4838: [BEAM-3818] Add 
support for streaming side inputs in the DirectRunner (part I: update 
_SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py 
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 46176c9e969..d0ab55f5462 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -56,6 +56,11 @@ def __init__(self, view):
 self.value = None
 self.has_result = False
 
+  def __repr__(self):
+elements_string = (', '.join(str(elm) for elm in self.elements)
+   if self.elements else '[]')
+return '_SideInputView(elements=%s)' % elements_string
+
 
 class _SideInputsContainer(object):
   """An in-process container for side inputs.
@@ -67,8 +72,16 @@ class _SideInputsContainer(object):
   def __init__(self, views):
 self._lock = threading.Lock()
 self._views = {}
+self._transform_to_views = collections.defaultdict(list)
+
 for view in views:
   self._views[view] = _SideInputView(view)
+  self._transform_to_views[view.pvalue.producer].append(view)
+
+  def __repr__(self):
+views_string = (', '.join(str(elm) for elm in self._views.values())
+if self._views.values() else '[]')
+return '_SideInputsContainer(_views=%s)' % views_string
 
   def get_value_or_schedule_after_output(self, side_input, task):
 with self._lock:
@@ -99,6 +112,19 @@ def finalize_value_and_get_tasks(self, side_input):
   view.has_result = True
   return result
 
+  def update_watermarks_for_transform(self, ptransform, watermark):
+# Collect tasks that get unblocked as the workflow progresses.
+unblocked_tasks = []
+for view in self._transform_to_views[ptransform]:
+  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
+return unblocked_tasks
+
+  def _update_watermarks_for_view(self, view, watermark):
+unblocked_tasks = []
+if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
+  unblocked_tasks = self.finalize_value_and_get_tasks(view)
+return unblocked_tasks
+
   def _pvalue_to_value(self, view, values):
 """Given a side input view, returns the associated value in requested form.
 
@@ -149,10 +175,10 @@ def __init__(self, pipeline_options, bundle_factory, 
root_transforms,
   self._pcollection_to_views[view.pvalue].append(view)
 self._transform_keyed_states = self._initialize_keyed_states(
 root_transforms, value_to_consumers)
+self._side_inputs_container = _SideInputsContainer(views)
 self._watermark_manager = WatermarkManager(
 clock, root_transforms, value_to_consumers,
 self._transform_keyed_states)
-self._side_inputs_container = _SideInputsContainer(views)
 self._pending_unblocked_tasks = []
 self._counter_factory = counters.CounterFactory()
 self._metrics = DirectMetrics()
@@ -199,9 +225,6 @@ def handle_result(
   committed_bundles, unprocessed_bundles = self._commit_bundles(
   result.uncommitted_output_bundles,
   result.unprocessed_bundles)
-  self._watermark_manager.update_watermarks(
-  completed_bundle, result.transform, completed_timers,
-  committed_bundles, unprocessed_bundles, result.keyed_watermark_holds)
 
   self._metrics.commit_logical(completed_bundle,
result.logical_metric_updates)
@@ -217,11 +240,13 @@ def handle_result(
 self._side_inputs_container.add_values(
 view,
 committed_bundle.get_elements_iterable(make_copy=True))
-  if (self.get_execution_context(result.transform)
-  .watermarks.input_watermark
-  == WatermarkManager.WATERMARK_POS_INF):
-self._pending_unblocked_tasks.extend(
-self._side_inputs_container.finalize_value_and_get_tasks(view))
+
+  # Tasks generated from unblocked side inputs as the watermark progresses.
+  tasks = self._watermark_manager.update_watermarks(
+  completed_bundle, result.transform, completed_timers,
+  committed_bundles, 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81724=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81724
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 19/Mar/18 05:11
Start Date: 19/Mar/18 05:11
Worklog Time Spent: 10m 
  Work Description: mariapython commented on issue #4838: [BEAM-3818] Add 
support for streaming side inputs in the DirectRunner (part I: update 
_SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#issuecomment-374102961
 
 
   @lukecwik: I see. Is there a way to squash remotely, i.e. Jenkins would know 
that you are only putting all commits under one name?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81724)
Time Spent: 4.5h  (was: 4h 20m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81697=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81697
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 18/Mar/18 22:31
Start Date: 18/Mar/18 22:31
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #4838: [BEAM-3818] Add 
support for streaming side inputs in the DirectRunner (part I: update 
_SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#issuecomment-374058795
 
 
   @mariapython Anytime you make a change to the PR, the tests are rerun.
   
   Github/Jenkins doesn't know that your new single squashed commit is 
equivalent to your old commits.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81697)
Time Spent: 4h 20m  (was: 4h 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81696=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81696
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 18/Mar/18 22:23
Start Date: 18/Mar/18 22:23
Worklog Time Spent: 10m 
  Work Description: mariapython commented on issue #4838: [BEAM-3818] Add 
support for streaming side inputs in the DirectRunner (part I: update 
_SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#issuecomment-374057693
 
 
   OK, I just squashed the commits and pushed. I am not sure why that triggered 
tests again...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81696)
Time Spent: 4h 10m  (was: 4h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81685=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81685
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 18/Mar/18 20:07
Start Date: 18/Mar/18 20:07
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4838: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: 
update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175301812
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -217,11 +242,13 @@ def handle_result(
 self._side_inputs_container.add_values(
 view,
 committed_bundle.get_elements_iterable(make_copy=True))
-  if (self.get_execution_context(result.transform)
-  .watermarks.input_watermark
-  == WatermarkManager.WATERMARK_POS_INF):
-self._pending_unblocked_tasks.extend(
-self._side_inputs_container.finalize_value_and_get_tasks(view))
+
+  # Tasks generated from unblocked side inputs as the watermark progresses.
+  tasks = self._watermark_manager.update_watermarks(
+  completed_bundle, result.transform, completed_timers,
+  committed_bundles, unprocessed_bundles, result.keyed_watermark_holds,
+  self._side_inputs_container)
+  self._pending_unblocked_tasks.extend(tasks)
 
 Review comment:
   Makes sense. Thank you.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81685)
Time Spent: 4h  (was: 3h 50m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81603=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81603
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 19:28
Start Date: 17/Mar/18 19:28
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part I: update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175267711
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -217,11 +242,13 @@ def handle_result(
 self._side_inputs_container.add_values(
 view,
 committed_bundle.get_elements_iterable(make_copy=True))
-  if (self.get_execution_context(result.transform)
-  .watermarks.input_watermark
-  == WatermarkManager.WATERMARK_POS_INF):
-self._pending_unblocked_tasks.extend(
-self._side_inputs_container.finalize_value_and_get_tasks(view))
+
+  # Tasks generated from unblocked side inputs as the watermark progresses.
+  tasks = self._watermark_manager.update_watermarks(
+  completed_bundle, result.transform, completed_timers,
+  committed_bundles, unprocessed_bundles, result.keyed_watermark_holds,
+  self._side_inputs_container)
+  self._pending_unblocked_tasks.extend(tasks)
 
 Review comment:
   I pushed all the changes earlier today and was going to say the same (the 
update needs to be regardless). That is why this was not modified in my last 
commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81603)
Time Spent: 3h 40m  (was: 3.5h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81604=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81604
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 19:28
Start Date: 17/Mar/18 19:28
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part I: update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175267611
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/watermark_manager.py
 ##
 @@ -94,14 +95,14 @@ def get_watermarks(self, applied_ptransform):
 
   def update_watermarks(self, completed_committed_bundle, applied_ptransform,
 completed_timers, outputs, unprocessed_bundles,
-keyed_earliest_holds):
+keyed_earliest_holds, side_inputs_container):
 
 Review comment:
   I already pushed the change earlier, sorry I forgot to comment here.
   Actually what I don't need is the  attribute, so I got rid of it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81604)
Time Spent: 3h 50m  (was: 3h 40m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81602=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81602
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 19:25
Start Date: 17/Mar/18 19:25
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part I: update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175267622
 
 

 ##
 File path: sdks/python/apache_beam/testing/test_stream_test.py
 ##
 @@ -245,6 +247,84 @@ def fired_elements(elem):
 # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
 self.assertEqual([('k', ['a'])], result)
 
+  def test_basic_execution_sideinputs_batch(self):
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+global result # pylint: disable=global-variable-undefined
+result = []
+
+def recorded_elements(elem):
+  result.append(elem)
+  return elem
+
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .advance_watermark_to(10)
+   .add_elements(['e']))
+side = (p
+| beam.Create([2, 1, 4])
+| beam.Map(lambda t: window.TimestampedValue(t, t)))
+
+class RecordFn(beam.DoFn):
+  def process(self,
+  elm=beam.DoFn.ElementParam,
+  ts=beam.DoFn.TimestampParam,
+  side=beam.DoFn.SideInputParam):
+yield (elm, ts, side)
+
+records = (main_stream # pylint: disable=unused-variable
+   | beam.ParDo(RecordFn(), beam.pvalue.AsList(side))
+   | beam.Map(recorded_elements))
+p.run()
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result)
+
+  def test_basic_execution_sideinputs(self):
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+global result # pylint: disable=global-variable-undefined
+result = []
+
+def recorded_elements(elem):
+  result.append(elem)
+  return elem
+
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .advance_watermark_to(10)
+   .add_elements(['e'])
+   .advance_processing_time(11))
+side_stream = (p
+   | 'side TestStream' >> TestStream()
+   .add_elements([window.TimestampedValue(2, 2)])
+   .add_elements([window.TimestampedValue(1, 1)])
+   .add_elements([window.TimestampedValue(4, 4)]))
+
+class RecordFn(beam.DoFn):
+  def process(self,
+  elm=beam.DoFn.ElementParam,
+  ts=beam.DoFn.TimestampParam,
+  side=beam.DoFn.SideInputParam):
+yield (elm, ts, side)
+
+records = (main_stream# pylint: disable=unused-variable
+   | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream))
+   | beam.Map(recorded_elements))
+
+p.run()
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result)
+
 
 Review comment:
   Will do


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81602)
Time Spent: 3.5h  (was: 3h 20m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81600=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81600
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 19:25
Start Date: 17/Mar/18 19:25
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part I: update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175267611
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/watermark_manager.py
 ##
 @@ -94,14 +95,14 @@ def get_watermarks(self, applied_ptransform):
 
   def update_watermarks(self, completed_committed_bundle, applied_ptransform,
 completed_timers, outputs, unprocessed_bundles,
-keyed_earliest_holds):
+keyed_earliest_holds, side_inputs_container):
 
 Review comment:
   I already pushed the change earlier, sorry I forgot to comment here.
   Actually what I don't need is the  attribute, since I got rid of it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81600)
Time Spent: 3h 10m  (was: 3h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81601=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81601
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 19:25
Start Date: 17/Mar/18 19:25
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part I: update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175267616
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/watermark_manager.py
 ##
 @@ -128,8 +129,9 @@ def _update_pending(self, input_committed_bundle, 
applied_ptransform,
 if input_committed_bundle and input_committed_bundle.has_elements():
   completed_tw.remove_pending(input_committed_bundle)
 
-  def _refresh_watermarks(self, applied_ptransform):
+  def _refresh_watermarks(self, applied_ptransform, side_inputs_container):
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81601)
Time Spent: 3h 20m  (was: 3h 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81597=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81597
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 19:10
Start Date: 17/Mar/18 19:10
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part I: update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175267215
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/watermark_manager.py
 ##
 @@ -128,8 +129,9 @@ def _update_pending(self, input_committed_bundle, 
applied_ptransform,
 if input_committed_bundle and input_committed_bundle.has_elements():
   completed_tw.remove_pending(input_committed_bundle)
 
-  def _refresh_watermarks(self, applied_ptransform):
+  def _refresh_watermarks(self, applied_ptransform, side_inputs_container):
 
 Review comment:
   You're right--Maria, can you fix this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81597)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81598=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81598
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 19:10
Start Date: 17/Mar/18 19:10
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part I: update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175267206
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -217,11 +242,13 @@ def handle_result(
 self._side_inputs_container.add_values(
 view,
 committed_bundle.get_elements_iterable(make_copy=True))
-  if (self.get_execution_context(result.transform)
-  .watermarks.input_watermark
-  == WatermarkManager.WATERMARK_POS_INF):
-self._pending_unblocked_tasks.extend(
-self._side_inputs_container.finalize_value_and_get_tasks(view))
+
+  # Tasks generated from unblocked side inputs as the watermark progresses.
+  tasks = self._watermark_manager.update_watermarks(
+  completed_bundle, result.transform, completed_timers,
+  committed_bundles, unprocessed_bundles, result.keyed_watermark_holds,
+  self._side_inputs_container)
+  self._pending_unblocked_tasks.extend(tasks)
 
 Review comment:
   I think we need it to be more general than only if the current bundle is a 
view.  Any watermark update may trigger downstream views to be updated--for 
example, say my windowing is Fixed(10), and I have a single element at time 
t=5.  When the upstream watermark hits t=10, there may not be a bundle 
processed by the transform directly going into the view, so we do need this in 
the full generality, since the watermark update of the previous step could 
update the watermark of the transform emitting the view.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81598)
Time Spent: 3h  (was: 2h 50m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-17 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81596=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81596
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 19:10
Start Date: 17/Mar/18 19:10
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part I: update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175267209
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/watermark_manager.py
 ##
 @@ -94,14 +95,14 @@ def get_watermarks(self, applied_ptransform):
 
   def update_watermarks(self, completed_committed_bundle, applied_ptransform,
 completed_timers, outputs, unprocessed_bundles,
-keyed_earliest_holds):
+keyed_earliest_holds, side_inputs_container):
 
 Review comment:
   You're right--Maria, can you fix this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81596)
Time Spent: 2h 50m  (was: 2h 40m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81483=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81483
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 02:26
Start Date: 17/Mar/18 02:26
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4838: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: 
update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175245310
 
 

 ##
 File path: sdks/python/apache_beam/testing/test_stream_test.py
 ##
 @@ -245,6 +247,84 @@ def fired_elements(elem):
 # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
 self.assertEqual([('k', ['a'])], result)
 
+  def test_basic_execution_sideinputs_batch(self):
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+global result # pylint: disable=global-variable-undefined
+result = []
+
+def recorded_elements(elem):
+  result.append(elem)
+  return elem
+
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .advance_watermark_to(10)
+   .add_elements(['e']))
+side = (p
+| beam.Create([2, 1, 4])
+| beam.Map(lambda t: window.TimestampedValue(t, t)))
+
+class RecordFn(beam.DoFn):
+  def process(self,
+  elm=beam.DoFn.ElementParam,
+  ts=beam.DoFn.TimestampParam,
+  side=beam.DoFn.SideInputParam):
+yield (elm, ts, side)
+
+records = (main_stream # pylint: disable=unused-variable
+   | beam.ParDo(RecordFn(), beam.pvalue.AsList(side))
+   | beam.Map(recorded_elements))
+p.run()
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result)
+
+  def test_basic_execution_sideinputs(self):
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+global result # pylint: disable=global-variable-undefined
+result = []
+
+def recorded_elements(elem):
+  result.append(elem)
+  return elem
+
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .advance_watermark_to(10)
+   .add_elements(['e'])
+   .advance_processing_time(11))
+side_stream = (p
+   | 'side TestStream' >> TestStream()
+   .add_elements([window.TimestampedValue(2, 2)])
+   .add_elements([window.TimestampedValue(1, 1)])
+   .add_elements([window.TimestampedValue(4, 4)]))
+
+class RecordFn(beam.DoFn):
+  def process(self,
+  elm=beam.DoFn.ElementParam,
+  ts=beam.DoFn.TimestampParam,
+  side=beam.DoFn.SideInputParam):
+yield (elm, ts, side)
+
+records = (main_stream# pylint: disable=unused-variable
+   | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream))
+   | beam.Map(recorded_elements))
+
+p.run()
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result)
+
 
 Review comment:
   For the follow up PR, please add some non-trivial side input tests. 
(Suggestions: A test with partial side inputs in the accessible window, another 
with more main elements.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81483)
Time Spent: 2h 20m  (was: 2h 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for 

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81485=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81485
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 02:26
Start Date: 17/Mar/18 02:26
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4838: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: 
update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175244992
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -217,11 +242,13 @@ def handle_result(
 self._side_inputs_container.add_values(
 view,
 committed_bundle.get_elements_iterable(make_copy=True))
-  if (self.get_execution_context(result.transform)
-  .watermarks.input_watermark
-  == WatermarkManager.WATERMARK_POS_INF):
-self._pending_unblocked_tasks.extend(
-self._side_inputs_container.finalize_value_and_get_tasks(view))
+
+  # Tasks generated from unblocked side inputs as the watermark progresses.
+  tasks = self._watermark_manager.update_watermarks(
+  completed_bundle, result.transform, completed_timers,
+  committed_bundles, unprocessed_bundles, result.keyed_watermark_holds,
+  self._side_inputs_container)
+  self._pending_unblocked_tasks.extend(tasks)
 
 Review comment:
   Should this stay inside the if condition? Since only when a view is updated, 
it will be necessary to check for unblocked tasks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81485)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81484=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81484
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 02:26
Start Date: 17/Mar/18 02:26
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4838: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: 
update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175245107
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/watermark_manager.py
 ##
 @@ -128,8 +129,9 @@ def _update_pending(self, input_committed_bundle, 
applied_ptransform,
 if input_committed_bundle and input_committed_bundle.has_elements():
   completed_tw.remove_pending(input_committed_bundle)
 
-  def _refresh_watermarks(self, applied_ptransform):
+  def _refresh_watermarks(self, applied_ptransform, side_inputs_container):
 
 Review comment:
   Similar to the above comment. Can you use self._side_inputs_container 
instead of adding this new argument here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81484)
Time Spent: 2.5h  (was: 2h 20m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81487=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81487
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 02:26
Start Date: 17/Mar/18 02:26
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4838: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: 
update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175245092
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/watermark_manager.py
 ##
 @@ -94,14 +95,14 @@ def get_watermarks(self, applied_ptransform):
 
   def update_watermarks(self, completed_committed_bundle, applied_ptransform,
 completed_timers, outputs, unprocessed_bundles,
-keyed_earliest_holds):
+keyed_earliest_holds, side_inputs_container):
 
 Review comment:
   How is this new argument `side_inputs_container` different from the one 
passed to the constructor above?
   
   Can you use `self._side_inputs_container` instead of adding this new 
argument here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81487)
Time Spent: 2h 40m  (was: 2.5h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81486=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81486
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 02:26
Start Date: 17/Mar/18 02:26
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4838: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: 
update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175244875
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -56,6 +56,12 @@ def __init__(self, view):
 self.value = None
 self.has_result = False
 
+  def __repr__(self):
+elements_string = (
+', '.join(
+str(elm) for elm in self.elements) if self.elements else '')
 
 Review comment:
   Why do we use `` here and `[]` below to represent empty list?
   
   (It is fine if this is fixed in a follow up PR.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81486)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81482=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81482
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 01:42
Start Date: 17/Mar/18 01:42
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner 
(part I: update _SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838#discussion_r175244099
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -99,6 +113,19 @@ def finalize_value_and_get_tasks(self, side_input):
   view.has_result = True
   return result
 
+  def update_watermarks_for_transform(self, ptransform, watermark):
+# Collect tasks that get unblocked as the workflow progresses
+unblocked_tasks = []
+for view in self._transform_to_views[ptransform]:
+  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
+return unblocked_tasks
+
+  def _update_watermarks_for_view(self, view, watermark):
+unblocked_tasks = []
+if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81482)
Time Spent: 2h 10m  (was: 2h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81478=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81478
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 01:34
Start Date: 17/Mar/18 01:34
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r175243839
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -56,6 +56,11 @@ def __init__(self, view):
 self.value = None
 self.has_result = False
 
+  def __repr__(self):
+elements = ', '.join([
+str(elm) for elm in self.elements] if self.elements else [''])
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81478)
Time Spent: 1h 40m  (was: 1.5h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81477=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81477
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 01:34
Start Date: 17/Mar/18 01:34
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r175243837
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -67,8 +72,17 @@ class _SideInputsContainer(object):
   def __init__(self, views):
 self._lock = threading.Lock()
 self._views = {}
+self._transform_to_views = collections.defaultdict(list)
+
 for view in views:
   self._views[view] = _SideInputView(view)
+  self._transform_to_views[view.pvalue.producer].append(view)
+
+  def __repr__(self):
+views = ', '.join([
+str(elm) for elm in self._views.values()
+] if self._views.values() else [])
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81477)
Time Spent: 1.5h  (was: 1h 20m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81480=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81480
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 01:34
Start Date: 17/Mar/18 01:34
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r175243849
 
 

 ##
 File path: sdks/python/apache_beam/testing/test_stream_test.py
 ##
 @@ -245,6 +247,82 @@ def fired_elements(elem):
 # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
 self.assertEqual([('k', ['a'])], result)
 
+  def test_basic_execution_sideinputs_batch(self):
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+global result # pylint: disable=global-variable-undefined
+result = []
+
+def recorded_elements(elem):
+  result.append(elem)
+  return elem
+
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .advance_watermark_to(10)
+   .add_elements(['e']))
+side = (p
+| beam.Create([2, 1, 4])
+| beam.Map(lambda t: window.TimestampedValue(t, t)))
+
+class RecordFn(beam.DoFn):
+  def process(self,
+  elm=beam.DoFn.ElementParam,
+  ts=beam.DoFn.TimestampParam,
+  side=beam.DoFn.SideInputParam):
+yield (elm, ts, side)
+
+records = main_stream | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)) | 
beam.Map(recorded_elements) # pylint: disable=line-too-long, unused-variable
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81480)
Time Spent: 2h  (was: 1h 50m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81479=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81479
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 01:34
Start Date: 17/Mar/18 01:34
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r175243844
 
 

 ##
 File path: sdks/python/apache_beam/testing/test_stream_test.py
 ##
 @@ -245,6 +247,82 @@ def fired_elements(elem):
 # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
 self.assertEqual([('k', ['a'])], result)
 
+  def test_basic_execution_sideinputs_batch(self):
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+global result # pylint: disable=global-variable-undefined
+result = []
+
+def recorded_elements(elem):
+  result.append(elem)
+  return elem
+
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .advance_watermark_to(10)
+   .add_elements(['e']))
+side = (p
+| beam.Create([2, 1, 4])
+| beam.Map(lambda t: window.TimestampedValue(t, t)))
+
+class RecordFn(beam.DoFn):
+  def process(self,
+  elm=beam.DoFn.ElementParam,
+  ts=beam.DoFn.TimestampParam,
+  side=beam.DoFn.SideInputParam):
+yield (elm, ts, side)
+
+records = main_stream | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)) | 
beam.Map(recorded_elements) # pylint: disable=line-too-long, unused-variable
+p.run()
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result)
+
+  def test_basic_execution_sideinputs(self):
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+global result # pylint: disable=global-variable-undefined
+result = []
+
+def recorded_elements(elem):
+  result.append(elem)
+  return elem
+
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .advance_watermark_to(10)
+   .add_elements(['e'])
+   .advance_processing_time(11))
+# TODO(mariagh): Fix this
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81479)
Time Spent: 1h 50m  (was: 1h 40m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81475=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81475
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 01:33
Start Date: 17/Mar/18 01:33
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r175243829
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -217,11 +241,12 @@ def handle_result(
 self._side_inputs_container.add_values(
 view,
 committed_bundle.get_elements_iterable(make_copy=True))
-  if (self.get_execution_context(result.transform)
-  .watermarks.input_watermark
-  == WatermarkManager.WATERMARK_POS_INF):
-self._pending_unblocked_tasks.extend(
-self._side_inputs_container.finalize_value_and_get_tasks(view))
+
+  tasks = self._watermark_manager.update_watermarks(
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81475)
Time Spent: 1h 10m  (was: 1h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81476=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81476
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 01:34
Start Date: 17/Mar/18 01:34
Worklog Time Spent: 10m 
  Work Description: mariapython commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r175243831
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -99,6 +113,19 @@ def finalize_value_and_get_tasks(self, side_input):
   view.has_result = True
   return result
 
+  def update_watermarks_for_transform(self, ptransform, watermark):
+# Collect tasks that get unblocked as the workflow progresses
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81476)
Time Spent: 1h 20m  (was: 1h 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81452=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81452
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 17/Mar/18 00:13
Start Date: 17/Mar/18 00:13
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4838: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r175239845
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -99,6 +113,19 @@ def finalize_value_and_get_tasks(self, side_input):
   view.has_result = True
   return result
 
+  def update_watermarks_for_transform(self, ptransform, watermark):
+# Collect tasks that get unblocked as the workflow progresses
+unblocked_tasks = []
+for view in self._transform_to_views[ptransform]:
+  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
+return unblocked_tasks
+
+  def _update_watermarks_for_view(self, view, watermark):
+unblocked_tasks = []
+if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
 
 Review comment:
   Thank you. Sounds good, let's do that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81452)
Time Spent: 1h  (was: 50m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81413=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81413
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 16/Mar/18 22:33
Start Date: 16/Mar/18 22:33
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r175229449
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -99,6 +113,19 @@ def finalize_value_and_get_tasks(self, side_input):
   view.has_result = True
   return result
 
+  def update_watermarks_for_transform(self, ptransform, watermark):
+# Collect tasks that get unblocked as the workflow progresses
+unblocked_tasks = []
+for view in self._transform_to_views[ptransform]:
+  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
+return unblocked_tasks
+
+  def _update_watermarks_for_view(self, view, watermark):
+unblocked_tasks = []
+if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
 
 Review comment:
   Yes, this is part one of a refactor that will deliver the streaming side 
input functionality.  I've synced with Maria, and she has the other parts of 
the refactor after this one goes in.  Perhaps we should clarify the title of 
the PR though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81413)
Time Spent: 50m  (was: 40m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-16 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81410=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81410
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 16/Mar/18 22:31
Start Date: 16/Mar/18 22:31
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #4838: 
[BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r175229093
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -99,6 +113,19 @@ def finalize_value_and_get_tasks(self, side_input):
   view.has_result = True
   return result
 
+  def update_watermarks_for_transform(self, ptransform, watermark):
+# Collect tasks that get unblocked as the workflow progresses
+unblocked_tasks = []
+for view in self._transform_to_views[ptransform]:
+  unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
+return unblocked_tasks
+
+  def _update_watermarks_for_view(self, view, watermark):
+unblocked_tasks = []
+if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
 
 Review comment:
   Would not waiting for `WatermarkManager.WATERMARK_POS_INF` unblock side 
inputs only when they are completely finished?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81410)
Time Spent: 40m  (was: 0.5h)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=80995=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80995
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 15/Mar/18 22:50
Start Date: 15/Mar/18 22:50
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r174956108
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -217,11 +241,12 @@ def handle_result(
 self._side_inputs_container.add_values(
 view,
 committed_bundle.get_elements_iterable(make_copy=True))
-  if (self.get_execution_context(result.transform)
-  .watermarks.input_watermark
-  == WatermarkManager.WATERMARK_POS_INF):
-self._pending_unblocked_tasks.extend(
-self._side_inputs_container.finalize_value_and_get_tasks(view))
+
+  tasks = self._watermark_manager.update_watermarks(
 
 Review comment:
   
   Please add comment that these tasks come from unblocked side inputs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80995)
Time Spent: 20m  (was: 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=80996=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80996
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 15/Mar/18 22:50
Start Date: 15/Mar/18 22:50
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r174956111
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -56,6 +56,11 @@ def __init__(self, view):
 self.value = None
 self.has_result = False
 
+  def __repr__(self):
+elements = ', '.join([
+str(elm) for elm in self.elements] if self.elements else [''])
 
 Review comment:
   
   Can you do the following?
   
   ```
   elements_string = ', '.join(str(elm) for elm in self.elements) if 
self.elements else ''
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80996)
Time Spent: 20m  (was: 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=80998=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80998
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 15/Mar/18 22:50
Start Date: 15/Mar/18 22:50
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r174956113
 
 

 ##
 File path: sdks/python/apache_beam/testing/test_stream_test.py
 ##
 @@ -245,6 +247,82 @@ def fired_elements(elem):
 # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
 self.assertEqual([('k', ['a'])], result)
 
+  def test_basic_execution_sideinputs_batch(self):
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+global result # pylint: disable=global-variable-undefined
+result = []
+
+def recorded_elements(elem):
+  result.append(elem)
+  return elem
+
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .advance_watermark_to(10)
+   .add_elements(['e']))
+side = (p
+| beam.Create([2, 1, 4])
+| beam.Map(lambda t: window.TimestampedValue(t, t)))
+
+class RecordFn(beam.DoFn):
+  def process(self,
+  elm=beam.DoFn.ElementParam,
+  ts=beam.DoFn.TimestampParam,
+  side=beam.DoFn.SideInputParam):
+yield (elm, ts, side)
+
+records = main_stream | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)) | 
beam.Map(recorded_elements) # pylint: disable=line-too-long, unused-variable
+p.run()
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result)
+
+  def test_basic_execution_sideinputs(self):
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+global result # pylint: disable=global-variable-undefined
+result = []
+
+def recorded_elements(elem):
+  result.append(elem)
+  return elem
+
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .advance_watermark_to(10)
+   .add_elements(['e'])
+   .advance_processing_time(11))
+# TODO(mariagh): Fix this
 
 Review comment:
   
   Please make this comment more detailed, or remove.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80998)
Time Spent: 20m  (was: 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81000=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81000
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 15/Mar/18 22:50
Start Date: 15/Mar/18 22:50
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r174956109
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -99,6 +113,19 @@ def finalize_value_and_get_tasks(self, side_input):
   view.has_result = True
   return result
 
+  def update_watermarks_for_transform(self, ptransform, watermark):
+# Collect tasks that get unblocked as the workflow progresses
 
 Review comment:
   
   nit: period at end of sentence.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 81000)
Time Spent: 0.5h  (was: 20m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=80997=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80997
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 15/Mar/18 22:50
Start Date: 15/Mar/18 22:50
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r174956110
 
 

 ##
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##
 @@ -67,8 +72,17 @@ class _SideInputsContainer(object):
   def __init__(self, views):
 self._lock = threading.Lock()
 self._views = {}
+self._transform_to_views = collections.defaultdict(list)
+
 for view in views:
   self._views[view] = _SideInputView(view)
+  self._transform_to_views[view.pvalue.producer].append(view)
+
+  def __repr__(self):
+views = ', '.join([
+str(elm) for elm in self._views.values()
+] if self._views.values() else [])
 
 Review comment:
   
   Same as above for `_SideInputView.__repr__`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80997)
Time Spent: 20m  (was: 10m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-15 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=80999=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80999
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 15/Mar/18 22:50
Start Date: 15/Mar/18 22:50
Worklog Time Spent: 10m 
  Work Description: charlesccychen commented on a change in pull request 
#4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#discussion_r174956114
 
 

 ##
 File path: sdks/python/apache_beam/testing/test_stream_test.py
 ##
 @@ -245,6 +247,82 @@ def fired_elements(elem):
 # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
 self.assertEqual([('k', ['a'])], result)
 
+  def test_basic_execution_sideinputs_batch(self):
+
+# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+global result # pylint: disable=global-variable-undefined
+result = []
+
+def recorded_elements(elem):
+  result.append(elem)
+  return elem
+
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .advance_watermark_to(10)
+   .add_elements(['e']))
+side = (p
+| beam.Create([2, 1, 4])
+| beam.Map(lambda t: window.TimestampedValue(t, t)))
+
+class RecordFn(beam.DoFn):
+  def process(self,
+  elm=beam.DoFn.ElementParam,
+  ts=beam.DoFn.TimestampParam,
+  side=beam.DoFn.SideInputParam):
+yield (elm, ts, side)
+
+records = main_stream | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)) | 
beam.Map(recorded_elements) # pylint: disable=line-too-long, unused-variable
 
 Review comment:
   
   Can you use the multi-line form to avoid the line-too-long?  You can also 
just avoid creating the `records` variable.
   
   ```
   (main_stream
| beam.ParDo(RecordFn, ...)
| beam.Map(...))
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 80999)
Time Spent: 0.5h  (was: 20m)

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

2018-03-09 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=79147=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79147
 ]

ASF GitHub Bot logged work on BEAM-3818:


Author: ASF GitHub Bot
Created on: 10/Mar/18 00:39
Start Date: 10/Mar/18 00:39
Worklog Time Spent: 10m 
  Work Description: mariapython commented on issue #4838: [BEAM-3818] Add 
support for streaming side inputs in the DirectRunner
URL: https://github.com/apache/beam/pull/4838#issuecomment-371984800
 
 
   R: @charlesccychen 
   cc: @aaltay, @robertwb 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 79147)
Time Spent: 10m
Remaining Estimate: 0h

> Add support for the streaming side inputs in the Python DirectRunner
> 
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: María GH
>Assignee: María GH
>Priority: Minor
> Fix For: 3.0.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)