This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bb310e7e907 Change type for UnboundedReaderMaxReadTimeSec (#31037)
bb310e7e907 is described below

commit bb310e7e90720b620f1089574f1656ca84a3656d
Author: Radosław Stankiewicz <radosl...@google.com>
AuthorDate: Thu Apr 18 21:50:05 2024 +0200

    Change type for UnboundedReaderMaxReadTimeSec (#31037)
    
    * add ms part for UnboundedReader checkpointing
    
    * typo
    
    * spotless
    
    * spotless
    
    * spotless
    
    * [IntLongMath] Expression of type int may overflow before being assigned 
to a long
    
    * readerMaxReadTime sec as double
    
    * readerMaxReadTime sec as double
    
    * readerMaxReadTime sec as double
    
    * readerMaxReadTime sec as double
    
    * spotless
---
 .../runners/dataflow/options/DataflowPipelineDebugOptions.java   | 9 +++++----
 .../apache/beam/runners/dataflow/worker/WorkerCustomSources.java | 4 +++-
 .../beam/runners/dataflow/worker/WorkerCustomSourcesTest.java    | 7 ++++---
 3 files changed, 12 insertions(+), 8 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 30496dec296..3f6c47ece68 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -218,11 +218,12 @@ public interface DataflowPipelineDebugOptions
 
   /** The max amount of time an UnboundedReader is consumed before 
checkpointing. */
   @Description(
-      "The max amount of time before an UnboundedReader is consumed before 
checkpointing, in seconds.")
-  @Default.Integer(10)
-  Integer getUnboundedReaderMaxReadTimeSec();
+      "The max amount of time before an UnboundedReader is consumed before 
checkpointing, "
+          + "in seconds. Duration can be set to fractions of seconds. ")
+  @Default.Double(10.0)
+  double getUnboundedReaderMaxReadTimeSec();
 
-  void setUnboundedReaderMaxReadTimeSec(Integer value);
+  void setUnboundedReaderMaxReadTimeSec(double value);
 
   /** The max elements read from an UnboundedReader before checkpointing. */
   @Description("The max elements read from an UnboundedReader before 
checkpointing. ")
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
index 8c086016ee9..a8e358f19e0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
@@ -798,7 +798,9 @@ public class WorkerCustomSources {
       DataflowPipelineDebugOptions debugOptions = 
options.as(DataflowPipelineDebugOptions.class);
       this.endTime =
           Instant.now()
-              
.plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec()));
+              .plus(
+                  Duration.millis(
+                      (long) (debugOptions.getUnboundedReaderMaxReadTimeSec() 
* 1000L)));
       this.maxElems = debugOptions.getUnboundedReaderMaxElements();
       this.backoffFactory =
           FluentBackoff.DEFAULT
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index d451ec093f7..261567930fe 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -598,6 +598,7 @@ public class WorkerCustomSourcesTest {
     int maxElements = 10;
     DataflowPipelineDebugOptions debugOptions = 
options.as(DataflowPipelineDebugOptions.class);
     debugOptions.setUnboundedReaderMaxElements(maxElements);
+    debugOptions.setUnboundedReaderMaxReadTimeSec(10);
 
     ByteString state = ByteString.EMPTY;
     for (int i = 0; i < 10 * maxElements;
@@ -645,10 +646,10 @@ public class WorkerCustomSourcesTest {
         numReadOnThisIteration++;
       }
       Instant afterReading = Instant.now();
-      long maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec();
+      double maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec();
       assertThat(
-          new Duration(beforeReading, afterReading).getStandardSeconds(),
-          lessThanOrEqualTo(maxReadSec + 1));
+          new Duration(beforeReading, afterReading).getMillis(),
+          lessThanOrEqualTo((long) ((maxReadSec + 1) * 1000L)));
       assertThat(
           numReadOnThisIteration, 
lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements()));
 

Reply via email to