This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bcb40cf4e4a Change caching of global window inputs to be guarded by 
experiment (#31013)
bcb40cf4e4a is described below

commit bcb40cf4e4a9b9045b51162edab09cf245456038
Author: Sam Whittle <scwhit...@users.noreply.github.com>
AuthorDate: Thu Apr 18 10:37:07 2024 +0200

    Change caching of global window inputs to be guarded by experiment (#31013)
    
    * Change caching of global window inputs to be guarded by experiment
    disable_global_windowed_args_caching
---
 sdks/python/apache_beam/runners/common.pxd |  4 +-
 sdks/python/apache_beam/runners/common.py  | 75 ++++++++++++++++++++----------
 2 files changed, 54 insertions(+), 25 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index 9fb44af6377..683bf8fcac1 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -100,7 +100,9 @@ cdef class PerWindowInvoker(DoFnInvoker):
   cdef dict kwargs_for_process_batch
   cdef list placeholders_for_process_batch
   cdef bint has_windowed_inputs
-  cdef bint cache_globally_windowed_args
+  cdef bint recalculate_window_args
+  cdef bint has_cached_window_args
+  cdef bint has_cached_window_batch_args
   cdef object process_method
   cdef object process_batch_method
   cdef bint is_splittable
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 82ff939dbae..7a1cef4005e 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -761,6 +761,17 @@ class PerWindowInvoker(DoFnInvoker):
       self.current_window_index = None
       self.stop_window_index = None
 
+    # TODO(https://github.com/apache/beam/issues/28776): Remove caching after
+    # fully rolling out.
+    # If true, always recalculate window args. If false, has_cached_window_args
+    # and has_cached_window_batch_args will be set to true if the corresponding
+    # self.args_for_process,have been updated and should be reused directly.
+    self.recalculate_window_args = (
+        self.has_windowed_inputs or 'disable_global_windowed_args_caching' in
+        RuntimeValueProvider.experiments)
+    self.has_cached_window_args = False
+    self.has_cached_window_batch_args = False
+
     # Try to prepare all the arguments that can just be filled in
     # without any additional work. in the process function.
     # Also cache all the placeholders needed in the process function.
@@ -921,16 +932,23 @@ class PerWindowInvoker(DoFnInvoker):
                                  additional_kwargs,
                                 ):
     # type: (...) -> Optional[SplitResultResidual]
-    if self.has_windowed_inputs:
-      assert len(windowed_value.windows) <= 1
-      window, = windowed_value.windows
+    if self.has_cached_window_args:
+      args_for_process, kwargs_for_process = (
+          self.args_for_process, self.kwargs_for_process)
     else:
-      window = GlobalWindow()
-    side_inputs = [si[window] for si in self.side_inputs]
-    side_inputs.extend(additional_args)
-    args_for_process, kwargs_for_process = util.insert_values_in_args(
-        self.args_for_process, self.kwargs_for_process,
-        side_inputs)
+      if self.has_windowed_inputs:
+        assert len(windowed_value.windows) <= 1
+        window, = windowed_value.windows
+      else:
+        window = GlobalWindow()
+      side_inputs = [si[window] for si in self.side_inputs]
+      side_inputs.extend(additional_args)
+      args_for_process, kwargs_for_process = util.insert_values_in_args(
+          self.args_for_process, self.kwargs_for_process, side_inputs)
+      if not self.recalculate_window_args:
+        self.args_for_process, self.kwargs_for_process = (
+            args_for_process, kwargs_for_process)
+        self.has_cached_window_args = True
 
     # Extract key in the case of a stateful DoFn. Note that in the case of a
     # stateful DoFn, we set during __init__ self.has_windowed_inputs to be
@@ -1012,20 +1030,29 @@ class PerWindowInvoker(DoFnInvoker):
   ):
     # type: (...) -> Optional[SplitResultResidual]
 
-    if self.has_windowed_inputs:
-      assert isinstance(windowed_batch, HomogeneousWindowedBatch)
-      assert len(windowed_batch.windows) <= 1
-      window, = windowed_batch.windows
+    if self.has_cached_window_batch_args:
+      args_for_process_batch, kwargs_for_process_batch = (
+          self.args_for_process_batch, self.kwargs_for_process_batch)
     else:
-      window = GlobalWindow()
-    side_inputs = [si[window] for si in self.side_inputs]
-    side_inputs.extend(additional_args)
-    (args_for_process_batch, kwargs_for_process_batch) = (
-        util.insert_values_in_args(
-            self.args_for_process_batch,
-            self.kwargs_for_process_batch,
-            side_inputs,
-        ))
+      if self.has_windowed_inputs:
+        assert isinstance(windowed_batch, HomogeneousWindowedBatch)
+        assert len(windowed_batch.windows) <= 1
+        window, = windowed_batch.windows
+      else:
+        window = GlobalWindow()
+      side_inputs = [si[window] for si in self.side_inputs]
+      side_inputs.extend(additional_args)
+      args_for_process_batch, kwargs_for_process_batch = (
+          util.insert_values_in_args(
+              self.args_for_process_batch,
+              self.kwargs_for_process_batch,
+              side_inputs,
+          )
+      )
+      if not self.recalculate_window_args:
+        self.args_for_process_batch, self.kwargs_for_process_batch = (
+            args_for_process_batch, kwargs_for_process_batch)
+        self.has_cached_window_batch_args = True
 
     for i, p in self.placeholders_for_process_batch:
       if core.DoFn.ElementParam == p:
@@ -1541,8 +1568,8 @@ class _OutputHandler(OutputHandler):
                tagged_receivers,  # type: Mapping[Optional[str], Receiver]
                per_element_output_counter,
                output_batch_converter, # type: Optional[BatchConverter]
-               process_yields_batches, # type: bool,
-               process_batch_yields_elements, # type: bool,
+               process_yields_batches, # type: bool
+               process_batch_yields_elements, # type: bool
                ):
     """Initializes ``_OutputHandler``.
 

Reply via email to