This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f5f7a471321 Merge pull request #26284: Fix GroupIntoBatches hold
f5f7a471321 is described below

commit f5f7a471321c903174b452a1982c5183a79ac6bc
Author: Reuven Lax <re...@google.com>
AuthorDate: Fri Apr 14 19:24:25 2023 -0700

    Merge pull request #26284: Fix GroupIntoBatches hold
---
 .../java/org/apache/beam/sdk/transforms/GroupIntoBatches.java     | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 311a3dac6ca..3616cc2e59f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -580,6 +580,7 @@ public class GroupIntoBatches<K, InputT>
             timerTs,
             minBufferedTs);
         bufferingTimer.clear();
+        holdTimer.clear();
       }
     }
 
@@ -593,13 +594,18 @@ public class GroupIntoBatches<K, InputT>
         @StateId(NUM_BYTES_IN_BATCH_ID) CombiningState<Long, long[], Long> 
storedBatchSizeBytes,
         @StateId(TIMER_TIMESTAMP) ValueState<Long> timerTs,
         @StateId(MIN_BUFFERED_TS) CombiningState<Long, long[], Long> 
minBufferedTs,
-        @TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer) {
+        @TimerId(END_OF_BUFFERING_ID) Timer bufferingTimer,
+        @TimerId(TIMER_HOLD_ID) Timer holdTimer) {
       LOG.debug(
           "*** END OF BUFFERING *** for timer timestamp {} with buffering 
duration {}",
           timestamp,
           maxBufferingDuration);
       flushBatch(
           receiver, key, batch, storedBatchSize, storedBatchSizeBytes, 
timerTs, minBufferedTs);
+      // Generally this is a noop, since holdTimer is not set if 
bufferingTimer is set. However we
+      // delete the holdTimer
+      // here in order to allow users to modify this policy on pipeline update.
+      holdTimer.clear();
     }
 
     @OnWindowExpiration

Reply via email to