[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=366597&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366597 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 06/Jan/20 12:21 Start Date: 06/Jan/20 12:21 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r363271841 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py ## @@ -200,6 +203,29 @@ def _get_state_cache_size(pipeline_options): return 0 +def _get_data_buffer_time_limit_ms(pipeline_options): + """Defines the time limt of the outbound data buffering. + + Note: data_buffer_time_limit_ms is an experimental flag and might + not be available in future releases. + + Returns: +an int indicating the time limit in milliseconds of the the outbound + data buffering. Default is 0 (disabled) + """ + experiments = pipeline_options.view_as(DebugOptions).experiments + experiments = experiments if experiments else [] + + for experiment in experiments: +# There should only be 1 match so returning from the loop +if re.match(r'data_buffer_time_limit_ms=', experiment): + return int( + re.match( + r'data_buffer_time_limit_ms=(?P.*)', Review comment: +1 for unified experiment names across all languages. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 366597) Time Spent: 4.5h (was: 4h 20m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Fix For: 2.19.0 > > Time Spent: 4.5h > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=366496&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366496 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 06/Jan/20 06:41 Start Date: 06/Jan/20 06:41 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r363174479 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py ## @@ -200,6 +203,29 @@ def _get_state_cache_size(pipeline_options): return 0 +def _get_data_buffer_time_limit_ms(pipeline_options): + """Defines the time limt of the outbound data buffering. + + Note: data_buffer_time_limit_ms is an experimental flag and might + not be available in future releases. + + Returns: +an int indicating the time limit in milliseconds of the the outbound + data buffering. Default is 0 (disabled) + """ + experiments = pipeline_options.view_as(DebugOptions).experiments + experiments = experiments if experiments else [] + + for experiment in experiments: +# There should only be 1 match so returning from the loop +if re.match(r'data_buffer_time_limit_ms=', experiment): + return int( + re.match( + r'data_buffer_time_limit_ms=(?P.*)', Review comment: Make sense to me. I have created a PR https://github.com/apache/beam/pull/10505 as a follow-up. It changes the config key "beam_fn_api_data_buffer_time_limit" in the Java SDK harness to "data_buffer_time_limit_ms". Please note that the config key "beam_fn_api_data_buffer_size_limit" in the Java SDK harness is also changed to "data_buffer_size_limit" to have a consistent naming convension. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 366496) Time Spent: 4h 20m (was: 4h 10m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Fix For: 2.19.0 > > Time Spent: 4h 20m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=365440&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365440 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 02/Jan/20 18:15 Start Date: 02/Jan/20 18:15 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r362572952 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py ## @@ -200,6 +203,29 @@ def _get_state_cache_size(pipeline_options): return 0 +def _get_data_buffer_time_limit_ms(pipeline_options): + """Defines the time limt of the outbound data buffering. + + Note: data_buffer_time_limit_ms is an experimental flag and might + not be available in future releases. + + Returns: +an int indicating the time limit in milliseconds of the the outbound + data buffering. Default is 0 (disabled) + """ + experiments = pipeline_options.view_as(DebugOptions).experiments + experiments = experiments if experiments else [] + + for experiment in experiments: +# There should only be 1 match so returning from the loop +if re.match(r'data_buffer_time_limit_ms=', experiment): + return int( + re.match( + r'data_buffer_time_limit_ms=(?P.*)', Review comment: I think maintaining consistency in these options is important so that users have a good experience across languages. Changing the java ones to match the python ones would be fine as well since the java ones don't seem to follow an explicit naming convention. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 365440) Time Spent: 4h 10m (was: 4h) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Fix For: 2.19.0 > > Time Spent: 4h 10m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=364948&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-364948 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 31/Dec/19 10:14 Start Date: 31/Dec/19 10:14 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r362187710 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py ## @@ -200,6 +203,29 @@ def _get_state_cache_size(pipeline_options): return 0 +def _get_data_buffer_time_limit_ms(pipeline_options): + """Defines the time limt of the outbound data buffering. + + Note: data_buffer_time_limit_ms is an experimental flag and might + not be available in future releases. + + Returns: +an int indicating the time limit in milliseconds of the the outbound + data buffering. Default is 0 (disabled) + """ + experiments = pipeline_options.view_as(DebugOptions).experiments + experiments = experiments if experiments else [] + + for experiment in experiments: +# There should only be 1 match so returning from the loop +if re.match(r'data_buffer_time_limit_ms=', experiment): + return int( + re.match( + r'data_buffer_time_limit_ms=(?P.*)', Review comment: I have also thought about this question when preparing this PR. The reason I have not done that in this PR is because I found that most config keys in the Java SDK harness starts with "beam_fn_api_" and it's not the same case for the config keys in the Python SDK harness. I'm fine to unify the config key if you don't think that's a problem. What's your thought? @lukecwik @mxm This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 364948) Time Spent: 4h (was: 3h 50m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Fix For: 2.19.0 > > Time Spent: 4h > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=364700&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-364700 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 30/Dec/19 18:08 Start Date: 30/Dec/19 18:08 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r362056607 ## File path: sdks/python/apache_beam/runners/worker/sdk_worker_main.py ## @@ -200,6 +203,29 @@ def _get_state_cache_size(pipeline_options): return 0 +def _get_data_buffer_time_limit_ms(pipeline_options): + """Defines the time limt of the outbound data buffering. + + Note: data_buffer_time_limit_ms is an experimental flag and might + not be available in future releases. + + Returns: +an int indicating the time limit in milliseconds of the the outbound + data buffering. Default is 0 (disabled) + """ + experiments = pipeline_options.view_as(DebugOptions).experiments + experiments = experiments if experiments else [] + + for experiment in experiments: +# There should only be 1 match so returning from the loop +if re.match(r'data_buffer_time_limit_ms=', experiment): + return int( + re.match( + r'data_buffer_time_limit_ms=(?P.*)', Review comment: Should we be making sure that the experiments have the same string constant across languages ([beam_fn_api_data_buffer_time_limit](https://github.com/apache/beam/blob/4c18cb4ada2650552a0006dfffd68d0775dd76c6/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.java#L39))? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 364700) Time Spent: 3h 50m (was: 3h 40m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Fix For: 2.19.0 > > Time Spent: 3h 50m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=363271&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-363271 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 25/Dec/19 12:43 Start Date: 25/Dec/19 12:43 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 363271) Time Spent: 3h 40m (was: 3.5h) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=362461&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-362461 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 23/Dec/19 08:53 Start Date: 23/Dec/19 08:53 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-568407237 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 362461) Time Spent: 3.5h (was: 3h 20m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=362011&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-362011 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 21/Dec/19 01:58 Start Date: 21/Dec/19 01:58 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-568142289 Thanks for your great comments, I have update the PR accordingly. ;) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 362011) Time Spent: 3h 20m (was: 3h 10m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=361608&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-361608 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 20/Dec/19 11:55 Start Date: 20/Dec/19 11:55 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r360343183 ## File path: sdks/python/apache_beam/runners/worker/data_plane.py ## @@ -69,24 +70,90 @@ class ClosableOutputStream(OutputStream): """A Outputstream for use with CoderImpls that has a close() method.""" + def __init__(self, close_callback=None): +super(ClosableOutputStream, self).__init__() +self._close_callback = close_callback + + def close(self): +if self._close_callback: + self._close_callback(self.get()) + + @staticmethod + def create(close_callback, + flush_callback, + data_buffer_time_limit_ms): +if data_buffer_time_limit_ms > 0: + return TimeBasedBufferingClosableOutputStream( + close_callback, + flush_callback=flush_callback, + time_flush_threshold_ms=data_buffer_time_limit_ms) +else: + return SizeBasedBufferingClosableOutputStream( + close_callback, flush_callback=flush_callback) + + +class SizeBasedBufferingClosableOutputStream(ClosableOutputStream): + """A size-based buffering OutputStream.""" + def __init__(self, close_callback=None, # type: Optional[Callable[[bytes], None]] flush_callback=None, # type: Optional[Callable[[bytes], None]] - flush_threshold=_DEFAULT_FLUSH_THRESHOLD): -super(ClosableOutputStream, self).__init__() -self._close_callback = close_callback + size_flush_threshold=_DEFAULT_SIZE_FLUSH_THRESHOLD): +super(SizeBasedBufferingClosableOutputStream, self).__init__(close_callback) self._flush_callback = flush_callback -self._flush_threshold = flush_threshold +self._size_flush_threshold = size_flush_threshold # This must be called explicitly to avoid flushing partial elements. def maybe_flush(self): -if self._flush_callback and self.size() > self._flush_threshold: +if self.size() > self._size_flush_threshold: + self.flush() + + def flush(self): +if self._flush_callback: self._flush_callback(self.get()) self._clear() + +class TimeBasedBufferingClosableOutputStream( +SizeBasedBufferingClosableOutputStream): + """A buffering OutputStream with both time-based and size-based.""" + + def __init__(self, + close_callback=None, + flush_callback=None, + size_flush_threshold=_DEFAULT_SIZE_FLUSH_THRESHOLD, + time_flush_threshold_ms=_DEFAULT_TIME_FLUSH_THRESHOLD_MS): +super(TimeBasedBufferingClosableOutputStream, self).__init__( +close_callback, flush_callback, size_flush_threshold) +assert time_flush_threshold_ms > 0 +self._time_flush_threshold_ms = time_flush_threshold_ms +self._flush_lock = threading.Lock() +self._schedule_lock = threading.Lock() +self._closed = False +self._schedule_periodic_flush() + + def flush(self): +with self._flush_lock: + super(TimeBasedBufferingClosableOutputStream, self).flush() + def close(self): -if self._close_callback: - self._close_callback(self.get()) +with self._schedule_lock: + self._closed = True + if self._flush_timer: +self._flush_timer.cancel() +self._flush_timer = None +super(TimeBasedBufferingClosableOutputStream, self).close() + + def _schedule_periodic_flush(self): +def _periodic_flush(): + with self._schedule_lock: +if not self._closed: + self.flush() + self._schedule_periodic_flush() Review comment: Also, this creates a `Thread` for every flush. We might just want to use a single thread, e.g. https://stackoverflow.com/a/12435256/2225100 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 361608) Time Spent: 3h 10m (was: 3h) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam >
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=361609&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-361609 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 20/Dec/19 11:55 Start Date: 20/Dec/19 11:55 Worklog Time Spent: 10m Work Description: mxm commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r360341553 ## File path: sdks/python/apache_beam/runners/worker/data_plane.py ## @@ -69,24 +70,90 @@ class ClosableOutputStream(OutputStream): """A Outputstream for use with CoderImpls that has a close() method.""" + def __init__(self, close_callback=None): +super(ClosableOutputStream, self).__init__() +self._close_callback = close_callback + + def close(self): +if self._close_callback: + self._close_callback(self.get()) + + @staticmethod + def create(close_callback, + flush_callback, + data_buffer_time_limit_ms): +if data_buffer_time_limit_ms > 0: + return TimeBasedBufferingClosableOutputStream( + close_callback, + flush_callback=flush_callback, + time_flush_threshold_ms=data_buffer_time_limit_ms) +else: + return SizeBasedBufferingClosableOutputStream( + close_callback, flush_callback=flush_callback) + + +class SizeBasedBufferingClosableOutputStream(ClosableOutputStream): + """A size-based buffering OutputStream.""" + def __init__(self, close_callback=None, # type: Optional[Callable[[bytes], None]] flush_callback=None, # type: Optional[Callable[[bytes], None]] - flush_threshold=_DEFAULT_FLUSH_THRESHOLD): -super(ClosableOutputStream, self).__init__() -self._close_callback = close_callback + size_flush_threshold=_DEFAULT_SIZE_FLUSH_THRESHOLD): +super(SizeBasedBufferingClosableOutputStream, self).__init__(close_callback) self._flush_callback = flush_callback -self._flush_threshold = flush_threshold +self._size_flush_threshold = size_flush_threshold # This must be called explicitly to avoid flushing partial elements. def maybe_flush(self): -if self._flush_callback and self.size() > self._flush_threshold: +if self.size() > self._size_flush_threshold: + self.flush() + + def flush(self): +if self._flush_callback: self._flush_callback(self.get()) self._clear() + +class TimeBasedBufferingClosableOutputStream( +SizeBasedBufferingClosableOutputStream): + """A buffering OutputStream with both time-based and size-based.""" + + def __init__(self, + close_callback=None, + flush_callback=None, + size_flush_threshold=_DEFAULT_SIZE_FLUSH_THRESHOLD, + time_flush_threshold_ms=_DEFAULT_TIME_FLUSH_THRESHOLD_MS): +super(TimeBasedBufferingClosableOutputStream, self).__init__( +close_callback, flush_callback, size_flush_threshold) +assert time_flush_threshold_ms > 0 +self._time_flush_threshold_ms = time_flush_threshold_ms +self._flush_lock = threading.Lock() +self._schedule_lock = threading.Lock() +self._closed = False +self._schedule_periodic_flush() + + def flush(self): +with self._flush_lock: + super(TimeBasedBufferingClosableOutputStream, self).flush() + def close(self): -if self._close_callback: - self._close_callback(self.get()) +with self._schedule_lock: + self._closed = True + if self._flush_timer: +self._flush_timer.cancel() +self._flush_timer = None +super(TimeBasedBufferingClosableOutputStream, self).close() + + def _schedule_periodic_flush(self): +def _periodic_flush(): + with self._schedule_lock: +if not self._closed: + self.flush() + self._schedule_periodic_flush() Review comment: Don't we have to set the timer before flushing? Otherwise we likely set the timer every `flush_threshold + flush_time`, where `flush_time` might be varying. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 361609) Time Spent: 3h 10m (was: 3h) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Projec
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=361582&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-361582 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 20/Dec/19 10:36 Start Date: 20/Dec/19 10:36 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-567878913 Sure, I have squash the commits as one. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 361582) Time Spent: 3h (was: 2h 50m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=361539&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-361539 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 20/Dec/19 09:22 Start Date: 20/Dec/19 09:22 Worklog Time Spent: 10m Work Description: mxm commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-567854991 Looks good. @sunjincheng121 Could you squash the commits? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 361539) Time Spent: 2h 50m (was: 2h 40m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=361334&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-361334 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 18/Dec/19 08:30 Start Date: 18/Dec/19 08:30 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-566930239 Thanks for the review @robertwb, I have update the PR accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 361334) Time Spent: 2h 40m (was: 2.5h) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=361333&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-361333 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 18/Dec/19 08:26 Start Date: 18/Dec/19 08:26 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r359209509 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -126,6 +126,9 @@ # The cache is disabled in production for other runners. STATE_CACHE_SIZE = 100 +# Time-based flush is enabled in the fn_api_runner for testing. +DATA_BUFFER_TIME_LIMIT = 1000 Review comment: Yes, It's milliseconds. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 361333) Time Spent: 2.5h (was: 2h 20m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360565&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360565 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 16/Dec/19 22:59 Start Date: 16/Dec/19 22:59 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r358508847 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -126,6 +126,9 @@ # The cache is disabled in production for other runners. STATE_CACHE_SIZE = 100 +# Time-based flush is enabled in the fn_api_runner for testing. Review comment: Seems this is used for more than just testing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 360565) Time Spent: 2h (was: 1h 50m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360570&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360570 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 16/Dec/19 22:59 Start Date: 16/Dec/19 22:59 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r358511055 ## File path: sdks/python/apache_beam/runners/worker/data_plane.py ## @@ -292,8 +357,15 @@ def close_callback(data): instruction_id=instruction_id, transform_id=transform_id, data=b'')) -return ClosableOutputStream( -close_callback, flush_callback=add_to_send_queue) + +if self._data_buffer_time_limit > 0: + return TimeBasedBufferingClosableOutputStream( Review comment: Continuing on the comment above, maybe make a helper to avoid logic duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 360570) Time Spent: 2h 20m (was: 2h 10m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360568&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360568 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 16/Dec/19 22:59 Start Date: 16/Dec/19 22:59 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r358508686 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -126,6 +126,9 @@ # The cache is disabled in production for other runners. STATE_CACHE_SIZE = 100 +# Time-based flush is enabled in the fn_api_runner for testing. +DATA_BUFFER_TIME_LIMIT = 1000 Review comment: Is there a time unit here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 360568) Time Spent: 2h 20m (was: 2h 10m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360567&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360567 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 16/Dec/19 22:59 Start Date: 16/Dec/19 22:59 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r358510103 ## File path: sdks/python/apache_beam/runners/worker/data_plane.py ## @@ -69,24 +70,78 @@ class ClosableOutputStream(OutputStream): """A Outputstream for use with CoderImpls that has a close() method.""" + def __init__(self, close_callback=None): +super(ClosableOutputStream, self).__init__() +self._close_callback = close_callback + + def close(self): +if self._close_callback: + self._close_callback(self.get()) + + +class SizeBasedBufferingClosableOutputStream(ClosableOutputStream): + """A size-based buffering OutputStream.""" + def __init__(self, close_callback=None, # type: Optional[Callable[[bytes], None]] flush_callback=None, # type: Optional[Callable[[bytes], None]] - flush_threshold=_DEFAULT_FLUSH_THRESHOLD): -super(ClosableOutputStream, self).__init__() + size_flush_threshold=_DEFAULT_SIZE_FLUSH_THRESHOLD): +super(SizeBasedBufferingClosableOutputStream, self).__init__(close_callback) self._close_callback = close_callback Review comment: Redundant with `super.__init__()`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 360567) Time Spent: 2h 20m (was: 2h 10m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360566&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360566 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 16/Dec/19 22:59 Start Date: 16/Dec/19 22:59 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r358509256 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner.py ## @@ -1581,15 +1585,20 @@ def __init__(self, super(EmbeddedGrpcWorkerHandler, self).__init__(state, provision_info, grpc_server) if payload: - state_cache_size = payload.decode('ascii') + state_cache_size, data_buffer_time_limit = \ + payload.decode('ascii').split(',') Review comment: Here we're changing the payload in a backwards incompatible way. Maybe at this point we should make it a json dict? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 360566) Time Spent: 2h 10m (was: 2h) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360569&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360569 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 16/Dec/19 22:59 Start Date: 16/Dec/19 22:59 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#discussion_r358510692 ## File path: sdks/python/apache_beam/runners/worker/data_plane.py ## @@ -195,8 +252,14 @@ def add_to_inverse_output(data): instruction_id=instruction_id, transform_id=transform_id, data=data)) -return ClosableOutputStream( -add_to_inverse_output, flush_callback=add_to_inverse_output) +if self._data_buffer_time_limit > 0: Review comment: It'd be better if one didn't have to choose one or the other. Would it be possible to compose them with wrappers? Otherwise, I'd make them the same class with both capabilities (optionally configured). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 360569) Time Spent: 2h 20m (was: 2h 10m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=360029&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-360029 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 16/Dec/19 01:45 Start Date: 16/Dec/19 01:45 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-565873164 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 360029) Time Spent: 1h 50m (was: 1h 40m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=359081&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-359081 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 13/Dec/19 02:25 Start Date: 13/Dec/19 02:25 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-565273080 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 359081) Time Spent: 1h 40m (was: 1.5h) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=358668&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358668 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 12/Dec/19 12:50 Start Date: 12/Dec/19 12:50 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-564993879 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 358668) Time Spent: 1.5h (was: 1h 20m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=358402&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358402 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 12/Dec/19 07:49 Start Date: 12/Dec/19 07:49 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-564890797 Rebase code... :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 358402) Time Spent: 1h 10m (was: 1h) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=358405&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-358405 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 12/Dec/19 07:49 Start Date: 12/Dec/19 07:49 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-564891058 R: @robertwb I appreciate if you have a quick look? :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 358405) Time Spent: 1h 20m (was: 1h 10m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=356151&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-356151 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 09/Dec/19 13:16 Start Date: 09/Dec/19 13:16 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-563232918 Thanks for the review @mxm ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 356151) Time Spent: 1h (was: 50m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=355980&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-355980 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 09/Dec/19 07:38 Start Date: 09/Dec/19 07:38 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-563105146 Hi @mxm, definitely agree with you that the bundle timeout could be lower for latency. However, I'm not sure if it's the best way for all the use cases / users as it has some overhead of finishing a bundle in my mind, i.e. all the states cached in the SDK harness will be flushed back to the runner if a bundle is finished. The lower the bundle timeout is set, the more unnecessary state traffic between runner and SDK harness will be introduced. The solution proposed in this PR (periodic flush) can avoid such problems while still lowering the latency. The time-based cache threshold has been supported in the Java data service in #9949. This PR tries to add similar functionality for the data service of the Python SDK harness. What do you think? Best, Jincheng This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 355980) Time Spent: 50m (was: 40m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=352471&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-352471 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 03/Dec/19 07:16 Start Date: 03/Dec/19 07:16 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-561034383 R: @lukecwik @mxm This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 352471) Time Spent: 40m (was: 0.5h) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=351608&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-351608 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 01/Dec/19 01:39 Start Date: 01/Dec/19 01:39 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-560037602 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 351608) Time Spent: 0.5h (was: 20m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=351177&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-351177 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 28/Nov/19 23:49 Start Date: 28/Nov/19 23:49 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246#issuecomment-559620534 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 351177) Time Spent: 20m (was: 10m) > Add time-based cache threshold support in the data service of the Python SDK > harness > > > Key: BEAM-7949 > URL: https://issues.apache.org/jira/browse/BEAM-7949 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-harness >Reporter: sunjincheng >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Currently only size-based cache threshold is supported in the data service of > Python SDK harness. It should also support the time-based cache threshold. > This is very important, especially for streaming jobs which are sensitive to > the delay. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7949) Add time-based cache threshold support in the data service of the Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-7949?focusedWorklogId=351095&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-351095 ] ASF GitHub Bot logged work on BEAM-7949: Author: ASF GitHub Bot Created on: 28/Nov/19 18:11 Start Date: 28/Nov/19 18:11 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on pull request #10246: [BEAM-7949] Add time-based cache threshold support in the data service of the Python SDK harness URL: https://github.com/apache/beam/pull/10246 Currently only size-based cache threshold is supported in the data service of Python SDK harness. This PR adds support of time-based cache threshold support in the Python SDK harness. This is especially useful for streaming jobs which are sensitive to the latency. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https