[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2020-01-06 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=366597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366597
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 06/Jan/20 12:21
Start Date: 06/Jan/20 12:21
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10246: [BEAM-7949] Add 
time-based cache threshold support in the data service of the Python SDK harness
URL: https://github.com/apache/beam/pull/10246#discussion_r363271841
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py
 ##
 @@ -200,6 +203,29 @@ def _get_state_cache_size(pipeline_options):
   return 0
 
 
+def _get_data_buffer_time_limit_ms(pipeline_options):
+  """Defines the time limt of the outbound data buffering.
+
+  Note: data_buffer_time_limit_ms is an experimental flag and might
+  not be available in future releases.
+
+  Returns:
+an int indicating the time limit in milliseconds of the the outbound
+  data buffering. Default is 0 (disabled)
+  """
+  experiments = pipeline_options.view_as(DebugOptions).experiments
+  experiments = experiments if experiments else []
+
+  for experiment in experiments:
+# There should only be 1 match so returning from the loop
+if re.match(r'data_buffer_time_limit_ms=', experiment):
+  return int(
+  re.match(
+  r'data_buffer_time_limit_ms=(?P.*)',
 
 Review comment:
   +1 for unified experiment names across all languages.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 366597)
Time Spent: 4.5h  (was: 4h 20m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
> Fix For: 2.19.0
>
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2020-01-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=366496&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366496
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 06/Jan/20 06:41
Start Date: 06/Jan/20 06:41
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on pull request #10246: 
[BEAM-7949] Add time-based cache threshold support in the data service of the 
Python SDK harness
URL: https://github.com/apache/beam/pull/10246#discussion_r363174479
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py
 ##
 @@ -200,6 +203,29 @@ def _get_state_cache_size(pipeline_options):
   return 0
 
 
+def _get_data_buffer_time_limit_ms(pipeline_options):
+  """Defines the time limt of the outbound data buffering.
+
+  Note: data_buffer_time_limit_ms is an experimental flag and might
+  not be available in future releases.
+
+  Returns:
+an int indicating the time limit in milliseconds of the the outbound
+  data buffering. Default is 0 (disabled)
+  """
+  experiments = pipeline_options.view_as(DebugOptions).experiments
+  experiments = experiments if experiments else []
+
+  for experiment in experiments:
+# There should only be 1 match so returning from the loop
+if re.match(r'data_buffer_time_limit_ms=', experiment):
+  return int(
+  re.match(
+  r'data_buffer_time_limit_ms=(?P.*)',
 
 Review comment:
   Make sense to me. I have created a PR 
https://github.com/apache/beam/pull/10505 as a follow-up. It changes the config 
key "beam_fn_api_data_buffer_time_limit" in the Java SDK harness to 
"data_buffer_time_limit_ms". Please note that the config key 
"beam_fn_api_data_buffer_size_limit" in the Java SDK harness is also changed to 
"data_buffer_size_limit" to have a consistent naming convension.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 366496)
Time Spent: 4h 20m  (was: 4h 10m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
> Fix For: 2.19.0
>
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2020-01-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=365440&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365440
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 02/Jan/20 18:15
Start Date: 02/Jan/20 18:15
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#discussion_r362572952
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py
 ##
 @@ -200,6 +203,29 @@ def _get_state_cache_size(pipeline_options):
   return 0
 
 
+def _get_data_buffer_time_limit_ms(pipeline_options):
+  """Defines the time limt of the outbound data buffering.
+
+  Note: data_buffer_time_limit_ms is an experimental flag and might
+  not be available in future releases.
+
+  Returns:
+an int indicating the time limit in milliseconds of the the outbound
+  data buffering. Default is 0 (disabled)
+  """
+  experiments = pipeline_options.view_as(DebugOptions).experiments
+  experiments = experiments if experiments else []
+
+  for experiment in experiments:
+# There should only be 1 match so returning from the loop
+if re.match(r'data_buffer_time_limit_ms=', experiment):
+  return int(
+  re.match(
+  r'data_buffer_time_limit_ms=(?P.*)',
 
 Review comment:
   I think maintaining consistency in these options is important so that users 
have a good experience across languages. Changing the java ones to match the 
python ones would be fine as well since the java ones don't seem to follow an 
explicit naming convention.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 365440)
Time Spent: 4h 10m  (was: 4h)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
> Fix For: 2.19.0
>
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-31 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=364948&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-364948
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 31/Dec/19 10:14
Start Date: 31/Dec/19 10:14
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on pull request #10246: 
[BEAM-7949] Add time-based cache threshold support in the data service of the 
Python SDK harness
URL: https://github.com/apache/beam/pull/10246#discussion_r362187710
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py
 ##
 @@ -200,6 +203,29 @@ def _get_state_cache_size(pipeline_options):
   return 0
 
 
+def _get_data_buffer_time_limit_ms(pipeline_options):
+  """Defines the time limt of the outbound data buffering.
+
+  Note: data_buffer_time_limit_ms is an experimental flag and might
+  not be available in future releases.
+
+  Returns:
+an int indicating the time limit in milliseconds of the the outbound
+  data buffering. Default is 0 (disabled)
+  """
+  experiments = pipeline_options.view_as(DebugOptions).experiments
+  experiments = experiments if experiments else []
+
+  for experiment in experiments:
+# There should only be 1 match so returning from the loop
+if re.match(r'data_buffer_time_limit_ms=', experiment):
+  return int(
+  re.match(
+  r'data_buffer_time_limit_ms=(?P.*)',
 
 Review comment:
   I have also thought about this question when preparing this PR. The reason I 
have not done that in this PR is because I found that most config keys in the 
Java SDK harness starts with "beam_fn_api_" and it's not the same case for the 
config keys in the Python SDK harness.
I'm fine to unify the config key if you don't think that's a problem. 
   What's your thought? @lukecwik @mxm 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 364948)
Time Spent: 4h  (was: 3h 50m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
> Fix For: 2.19.0
>
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=364700&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-364700
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 30/Dec/19 18:08
Start Date: 30/Dec/19 18:08
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on pull request #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#discussion_r362056607
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py
 ##
 @@ -200,6 +203,29 @@ def _get_state_cache_size(pipeline_options):
   return 0
 
 
+def _get_data_buffer_time_limit_ms(pipeline_options):
+  """Defines the time limt of the outbound data buffering.
+
+  Note: data_buffer_time_limit_ms is an experimental flag and might
+  not be available in future releases.
+
+  Returns:
+an int indicating the time limit in milliseconds of the the outbound
+  data buffering. Default is 0 (disabled)
+  """
+  experiments = pipeline_options.view_as(DebugOptions).experiments
+  experiments = experiments if experiments else []
+
+  for experiment in experiments:
+# There should only be 1 match so returning from the loop
+if re.match(r'data_buffer_time_limit_ms=', experiment):
+  return int(
+  re.match(
+  r'data_buffer_time_limit_ms=(?P.*)',
 
 Review comment:
   Should we be making sure that the experiments have the same string constant 
across languages 
([beam_fn_api_data_buffer_time_limit](https://github.com/apache/beam/blob/4c18cb4ada2650552a0006dfffd68d0775dd76c6/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java#L39))?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 364700)
Time Spent: 3h 50m  (was: 3h 40m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
> Fix For: 2.19.0
>
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-25 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=363271&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-363271
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 25/Dec/19 12:43
Start Date: 25/Dec/19 12:43
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10246: [BEAM-7949] Add 
time-based cache threshold support in the data service of the Python SDK harness
URL: https://github.com/apache/beam/pull/10246
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 363271)
Time Spent: 3h 40m  (was: 3.5h)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=362461&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-362461
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 23/Dec/19 08:53
Start Date: 23/Dec/19 08:53
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-568407237
 
 
   Run Python2_PVR_Flink PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 362461)
Time Spent: 3.5h  (was: 3h 20m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=362011&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-362011
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 21/Dec/19 01:58
Start Date: 21/Dec/19 01:58
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-568142289
 
 
   Thanks for your great comments, I have update the PR accordingly. ;)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 362011)
Time Spent: 3h 20m  (was: 3h 10m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=361608&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-361608
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 20/Dec/19 11:55
Start Date: 20/Dec/19 11:55
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10246: [BEAM-7949] Add 
time-based cache threshold support in the data service of the Python SDK harness
URL: https://github.com/apache/beam/pull/10246#discussion_r360343183
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##
 @@ -69,24 +70,90 @@
 class ClosableOutputStream(OutputStream):
   """A Outputstream for use with CoderImpls that has a close() method."""
 
+  def __init__(self, close_callback=None):
+super(ClosableOutputStream, self).__init__()
+self._close_callback = close_callback
+
+  def close(self):
+if self._close_callback:
+  self._close_callback(self.get())
+
+  @staticmethod
+  def create(close_callback,
+ flush_callback,
+ data_buffer_time_limit_ms):
+if data_buffer_time_limit_ms > 0:
+  return TimeBasedBufferingClosableOutputStream(
+  close_callback,
+  flush_callback=flush_callback,
+  time_flush_threshold_ms=data_buffer_time_limit_ms)
+else:
+  return SizeBasedBufferingClosableOutputStream(
+  close_callback, flush_callback=flush_callback)
+
+
+class SizeBasedBufferingClosableOutputStream(ClosableOutputStream):
+  """A size-based buffering OutputStream."""
+
   def __init__(self,
close_callback=None,  # type: Optional[Callable[[bytes], None]]
flush_callback=None,  # type: Optional[Callable[[bytes], None]]
-   flush_threshold=_DEFAULT_FLUSH_THRESHOLD):
-super(ClosableOutputStream, self).__init__()
-self._close_callback = close_callback
+   size_flush_threshold=_DEFAULT_SIZE_FLUSH_THRESHOLD):
+super(SizeBasedBufferingClosableOutputStream, 
self).__init__(close_callback)
 self._flush_callback = flush_callback
-self._flush_threshold = flush_threshold
+self._size_flush_threshold = size_flush_threshold
 
   # This must be called explicitly to avoid flushing partial elements.
   def maybe_flush(self):
-if self._flush_callback and self.size() > self._flush_threshold:
+if self.size() > self._size_flush_threshold:
+  self.flush()
+
+  def flush(self):
+if self._flush_callback:
   self._flush_callback(self.get())
   self._clear()
 
+
+class TimeBasedBufferingClosableOutputStream(
+SizeBasedBufferingClosableOutputStream):
+  """A buffering OutputStream with both time-based and size-based."""
+
+  def __init__(self,
+   close_callback=None,
+   flush_callback=None,
+   size_flush_threshold=_DEFAULT_SIZE_FLUSH_THRESHOLD,
+   time_flush_threshold_ms=_DEFAULT_TIME_FLUSH_THRESHOLD_MS):
+super(TimeBasedBufferingClosableOutputStream, self).__init__(
+close_callback, flush_callback, size_flush_threshold)
+assert time_flush_threshold_ms > 0
+self._time_flush_threshold_ms = time_flush_threshold_ms
+self._flush_lock = threading.Lock()
+self._schedule_lock = threading.Lock()
+self._closed = False
+self._schedule_periodic_flush()
+
+  def flush(self):
+with self._flush_lock:
+  super(TimeBasedBufferingClosableOutputStream, self).flush()
+
   def close(self):
-if self._close_callback:
-  self._close_callback(self.get())
+with self._schedule_lock:
+  self._closed = True
+  if self._flush_timer:
+self._flush_timer.cancel()
+self._flush_timer = None
+super(TimeBasedBufferingClosableOutputStream, self).close()
+
+  def _schedule_periodic_flush(self):
+def _periodic_flush():
+  with self._schedule_lock:
+if not self._closed:
+  self.flush()
+  self._schedule_periodic_flush()
 
 Review comment:
   Also, this creates a `Thread` for every flush. We might just want to use a 
single thread, e.g. https://stackoverflow.com/a/12435256/2225100
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 361608)
Time Spent: 3h 10m  (was: 3h)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>

[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=361609&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-361609
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 20/Dec/19 11:55
Start Date: 20/Dec/19 11:55
Worklog Time Spent: 10m 
  Work Description: mxm commented on pull request #10246: [BEAM-7949] Add 
time-based cache threshold support in the data service of the Python SDK harness
URL: https://github.com/apache/beam/pull/10246#discussion_r360341553
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##
 @@ -69,24 +70,90 @@
 class ClosableOutputStream(OutputStream):
   """A Outputstream for use with CoderImpls that has a close() method."""
 
+  def __init__(self, close_callback=None):
+super(ClosableOutputStream, self).__init__()
+self._close_callback = close_callback
+
+  def close(self):
+if self._close_callback:
+  self._close_callback(self.get())
+
+  @staticmethod
+  def create(close_callback,
+ flush_callback,
+ data_buffer_time_limit_ms):
+if data_buffer_time_limit_ms > 0:
+  return TimeBasedBufferingClosableOutputStream(
+  close_callback,
+  flush_callback=flush_callback,
+  time_flush_threshold_ms=data_buffer_time_limit_ms)
+else:
+  return SizeBasedBufferingClosableOutputStream(
+  close_callback, flush_callback=flush_callback)
+
+
+class SizeBasedBufferingClosableOutputStream(ClosableOutputStream):
+  """A size-based buffering OutputStream."""
+
   def __init__(self,
close_callback=None,  # type: Optional[Callable[[bytes], None]]
flush_callback=None,  # type: Optional[Callable[[bytes], None]]
-   flush_threshold=_DEFAULT_FLUSH_THRESHOLD):
-super(ClosableOutputStream, self).__init__()
-self._close_callback = close_callback
+   size_flush_threshold=_DEFAULT_SIZE_FLUSH_THRESHOLD):
+super(SizeBasedBufferingClosableOutputStream, 
self).__init__(close_callback)
 self._flush_callback = flush_callback
-self._flush_threshold = flush_threshold
+self._size_flush_threshold = size_flush_threshold
 
   # This must be called explicitly to avoid flushing partial elements.
   def maybe_flush(self):
-if self._flush_callback and self.size() > self._flush_threshold:
+if self.size() > self._size_flush_threshold:
+  self.flush()
+
+  def flush(self):
+if self._flush_callback:
   self._flush_callback(self.get())
   self._clear()
 
+
+class TimeBasedBufferingClosableOutputStream(
+SizeBasedBufferingClosableOutputStream):
+  """A buffering OutputStream with both time-based and size-based."""
+
+  def __init__(self,
+   close_callback=None,
+   flush_callback=None,
+   size_flush_threshold=_DEFAULT_SIZE_FLUSH_THRESHOLD,
+   time_flush_threshold_ms=_DEFAULT_TIME_FLUSH_THRESHOLD_MS):
+super(TimeBasedBufferingClosableOutputStream, self).__init__(
+close_callback, flush_callback, size_flush_threshold)
+assert time_flush_threshold_ms > 0
+self._time_flush_threshold_ms = time_flush_threshold_ms
+self._flush_lock = threading.Lock()
+self._schedule_lock = threading.Lock()
+self._closed = False
+self._schedule_periodic_flush()
+
+  def flush(self):
+with self._flush_lock:
+  super(TimeBasedBufferingClosableOutputStream, self).flush()
+
   def close(self):
-if self._close_callback:
-  self._close_callback(self.get())
+with self._schedule_lock:
+  self._closed = True
+  if self._flush_timer:
+self._flush_timer.cancel()
+self._flush_timer = None
+super(TimeBasedBufferingClosableOutputStream, self).close()
+
+  def _schedule_periodic_flush(self):
+def _periodic_flush():
+  with self._schedule_lock:
+if not self._closed:
+  self.flush()
+  self._schedule_periodic_flush()
 
 Review comment:
   Don't we have to set the timer before flushing? Otherwise we likely set the 
timer every `flush_threshold + flush_time`, where `flush_time` might be varying.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 361609)
Time Spent: 3h 10m  (was: 3h)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Projec

[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=361582&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-361582
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 20/Dec/19 10:36
Start Date: 20/Dec/19 10:36
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-567878913
 
 
   Sure, I have squash the commits as one. :)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 361582)
Time Spent: 3h  (was: 2h 50m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=361539&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-361539
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 20/Dec/19 09:22
Start Date: 20/Dec/19 09:22
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #10246: [BEAM-7949] Add 
time-based cache threshold support in the data service of the Python SDK harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-567854991
 
 
   Looks good. @sunjincheng121 Could you squash the commits?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 361539)
Time Spent: 2h 50m  (was: 2h 40m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=361334&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-361334
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 18/Dec/19 08:30
Start Date: 18/Dec/19 08:30
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-566930239
 
 
   Thanks for the review @robertwb, I have update the PR accordingly. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 361334)
Time Spent: 2h 40m  (was: 2.5h)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=361333&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-361333
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 18/Dec/19 08:26
Start Date: 18/Dec/19 08:26
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on pull request #10246: 
[BEAM-7949] Add time-based cache threshold support in the data service of the 
Python SDK harness
URL: https://github.com/apache/beam/pull/10246#discussion_r359209509
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -126,6 +126,9 @@
 # The cache is disabled in production for other runners.
 STATE_CACHE_SIZE = 100
 
+# Time-based flush is enabled in the fn_api_runner for testing.
+DATA_BUFFER_TIME_LIMIT = 1000
 
 Review comment:
   Yes, It's milliseconds.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 361333)
Time Spent: 2.5h  (was: 2h 20m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360565&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360565
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 16/Dec/19 22:59
Start Date: 16/Dec/19 22:59
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#discussion_r358508847
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -126,6 +126,9 @@
 # The cache is disabled in production for other runners.
 STATE_CACHE_SIZE = 100
 
+# Time-based flush is enabled in the fn_api_runner for testing.
 
 Review comment:
   Seems this is used for more than just testing. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 360565)
Time Spent: 2h  (was: 1h 50m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360570&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360570
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 16/Dec/19 22:59
Start Date: 16/Dec/19 22:59
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#discussion_r358511055
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##
 @@ -292,8 +357,15 @@ def close_callback(data):
   instruction_id=instruction_id,
   transform_id=transform_id,
   data=b''))
-return ClosableOutputStream(
-close_callback, flush_callback=add_to_send_queue)
+
+if self._data_buffer_time_limit > 0:
+  return TimeBasedBufferingClosableOutputStream(
 
 Review comment:
   Continuing on the comment above, maybe make a helper to avoid logic 
duplication. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 360570)
Time Spent: 2h 20m  (was: 2h 10m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360568&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360568
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 16/Dec/19 22:59
Start Date: 16/Dec/19 22:59
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#discussion_r358508686
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -126,6 +126,9 @@
 # The cache is disabled in production for other runners.
 STATE_CACHE_SIZE = 100
 
+# Time-based flush is enabled in the fn_api_runner for testing.
+DATA_BUFFER_TIME_LIMIT = 1000
 
 Review comment:
   Is there a time unit here? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 360568)
Time Spent: 2h 20m  (was: 2h 10m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360567&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360567
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 16/Dec/19 22:59
Start Date: 16/Dec/19 22:59
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#discussion_r358510103
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##
 @@ -69,24 +70,78 @@
 class ClosableOutputStream(OutputStream):
   """A Outputstream for use with CoderImpls that has a close() method."""
 
+  def __init__(self, close_callback=None):
+super(ClosableOutputStream, self).__init__()
+self._close_callback = close_callback
+
+  def close(self):
+if self._close_callback:
+  self._close_callback(self.get())
+
+
+class SizeBasedBufferingClosableOutputStream(ClosableOutputStream):
+  """A size-based buffering OutputStream."""
+
   def __init__(self,
close_callback=None,  # type: Optional[Callable[[bytes], None]]
flush_callback=None,  # type: Optional[Callable[[bytes], None]]
-   flush_threshold=_DEFAULT_FLUSH_THRESHOLD):
-super(ClosableOutputStream, self).__init__()
+   size_flush_threshold=_DEFAULT_SIZE_FLUSH_THRESHOLD):
+super(SizeBasedBufferingClosableOutputStream, 
self).__init__(close_callback)
 self._close_callback = close_callback
 
 Review comment:
   Redundant with `super.__init__()`?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 360567)
Time Spent: 2h 20m  (was: 2h 10m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360566&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360566
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 16/Dec/19 22:59
Start Date: 16/Dec/19 22:59
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#discussion_r358509256
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py
 ##
 @@ -1581,15 +1585,20 @@ def __init__(self,
 super(EmbeddedGrpcWorkerHandler, self).__init__(state, provision_info,
 grpc_server)
 if payload:
-  state_cache_size = payload.decode('ascii')
+  state_cache_size, data_buffer_time_limit = \
+  payload.decode('ascii').split(',')
 
 Review comment:
   Here we're changing the payload in a backwards incompatible way. Maybe at 
this point we should make it a json dict? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 360566)
Time Spent: 2h 10m  (was: 2h)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-16 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360569&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360569
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 16/Dec/19 22:59
Start Date: 16/Dec/19 22:59
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#discussion_r358510692
 
 

 ##
 File path: sdks/python/apache_beam/runners/worker/data_plane.py
 ##
 @@ -195,8 +252,14 @@ def add_to_inverse_output(data):
   instruction_id=instruction_id,
   transform_id=transform_id,
   data=data))
-return ClosableOutputStream(
-add_to_inverse_output, flush_callback=add_to_inverse_output)
+if self._data_buffer_time_limit > 0:
 
 Review comment:
   It'd be better if one didn't have to choose one or the other. Would it be 
possible to compose them with wrappers? Otherwise, I'd make them the same class 
with both capabilities (optionally configured). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 360569)
Time Spent: 2h 20m  (was: 2h 10m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360029&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360029
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 16/Dec/19 01:45
Start Date: 16/Dec/19 01:45
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-565873164
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 360029)
Time Spent: 1h 50m  (was: 1h 40m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=359081&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-359081
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 13/Dec/19 02:25
Start Date: 13/Dec/19 02:25
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-565273080
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 359081)
Time Spent: 1h 40m  (was: 1.5h)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=358668&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358668
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 12/Dec/19 12:50
Start Date: 12/Dec/19 12:50
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-564993879
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 358668)
Time Spent: 1.5h  (was: 1h 20m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=358402&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358402
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 12/Dec/19 07:49
Start Date: 12/Dec/19 07:49
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-564890797
 
 
   Rebase code... :)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 358402)
Time Spent: 1h 10m  (was: 1h)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-11 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=358405&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358405
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 12/Dec/19 07:49
Start Date: 12/Dec/19 07:49
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-564891058
 
 
   R: @robertwb  I appreciate if you have a quick look? :)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 358405)
Time Spent: 1h 20m  (was: 1h 10m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-09 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=356151&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356151
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 09/Dec/19 13:16
Start Date: 09/Dec/19 13:16
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-563232918
 
 
   Thanks for the review @mxm ! 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 356151)
Time Spent: 1h  (was: 50m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-08 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=355980&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355980
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 09/Dec/19 07:38
Start Date: 09/Dec/19 07:38
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-563105146
 
 
   Hi @mxm,  definitely agree with you that the bundle timeout could be lower 
for latency. However, I'm not sure if it's the best way for all the use cases / 
users as it has some overhead of finishing a bundle in my mind, i.e. all the 
states cached in the SDK harness will be flushed back to the runner if a bundle 
is finished. The lower the bundle timeout is set, the more unnecessary state 
traffic between runner and SDK harness will be introduced.
   
   The solution proposed in this PR (periodic flush) can avoid such problems 
while still lowering the latency.
   
   The time-based cache threshold has been supported in the Java data service 
in #9949. This PR tries to add similar functionality for the data service of 
the Python SDK harness.
   
   What do you think?
   
   Best,
   Jincheng
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 355980)
Time Spent: 50m  (was: 40m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-12-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=352471&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-352471
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 03/Dec/19 07:16
Start Date: 03/Dec/19 07:16
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-561034383
 
 
   R: @lukecwik @mxm 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 352471)
Time Spent: 40m  (was: 0.5h)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-11-30 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=351608&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-351608
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 01/Dec/19 01:39
Start Date: 01/Dec/19 01:39
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-560037602
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 351608)
Time Spent: 0.5h  (was: 20m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-11-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=351177&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-351177
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 28/Nov/19 23:49
Start Date: 28/Nov/19 23:49
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] 
Add time-based cache threshold support in the data service of the Python SDK 
harness
URL: https://github.com/apache/beam/pull/10246#issuecomment-559620534
 
 
   Run Python PreCommit
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 351177)
Time Spent: 20m  (was: 10m)

> Add time-based cache threshold support in the data service of the Python SDK 
> harness
> 
>
> Key: BEAM-7949
> URL: https://issues.apache.org/jira/browse/BEAM-7949
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-harness
>Reporter: sunjincheng
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently only size-based cache threshold is supported in the data service of 
> Python SDK harness. It should also support the time-based cache threshold. 
> This is very important, especially for streaming jobs which are sensitive to 
> the delay. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness

2019-11-28 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=351095&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-351095
 ]

ASF GitHub Bot logged work on BEAM-7949:


Author: ASF GitHub Bot
Created on: 28/Nov/19 18:11
Start Date: 28/Nov/19 18:11
Worklog Time Spent: 10m 
  Work Description: sunjincheng121 commented on pull request #10246: 
[BEAM-7949] Add time-based cache threshold support in the data service of the 
Python SDK harness
URL: https://github.com/apache/beam/pull/10246
 
 
   Currently only size-based cache threshold is supported in the data service 
of Python SDK harness. This PR adds support of time-based cache threshold 
support in the Python SDK harness. This is especially useful for streaming jobs 
which are sensitive to the latency.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https