This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch 
revert-31037-readerMaxReadTimeSec_as_double
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5c8f548c944a9bbc32e07a13eefaac5493b26808
Author: Jack McCluskey <34928439+jrmcclus...@users.noreply.github.com>
AuthorDate: Tue Apr 23 10:42:04 2024 -0400

    Revert "Change type for UnboundedReaderMaxReadTimeSec (#31037)"
    
    This reverts commit bb310e7e90720b620f1089574f1656ca84a3656d.
---
 .../runners/dataflow/options/DataflowPipelineDebugOptions.java   | 9 ++++-----
 .../apache/beam/runners/dataflow/worker/WorkerCustomSources.java | 4 +---
 .../beam/runners/dataflow/worker/WorkerCustomSourcesTest.java    | 7 +++----
 3 files changed, 8 insertions(+), 12 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
index 3f6c47ece68..30496dec296 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java
@@ -218,12 +218,11 @@ public interface DataflowPipelineDebugOptions
 
   /** The max amount of time an UnboundedReader is consumed before 
checkpointing. */
   @Description(
-      "The max amount of time before an UnboundedReader is consumed before 
checkpointing, "
-          + "in seconds. Duration can be set to fractions of seconds. ")
-  @Default.Double(10.0)
-  double getUnboundedReaderMaxReadTimeSec();
+      "The max amount of time before an UnboundedReader is consumed before 
checkpointing, in seconds.")
+  @Default.Integer(10)
+  Integer getUnboundedReaderMaxReadTimeSec();
 
-  void setUnboundedReaderMaxReadTimeSec(double value);
+  void setUnboundedReaderMaxReadTimeSec(Integer value);
 
   /** The max elements read from an UnboundedReader before checkpointing. */
   @Description("The max elements read from an UnboundedReader before 
checkpointing. ")
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
index a8e358f19e0..8c086016ee9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java
@@ -798,9 +798,7 @@ public class WorkerCustomSources {
       DataflowPipelineDebugOptions debugOptions = 
options.as(DataflowPipelineDebugOptions.class);
       this.endTime =
           Instant.now()
-              .plus(
-                  Duration.millis(
-                      (long) (debugOptions.getUnboundedReaderMaxReadTimeSec() 
* 1000L)));
+              
.plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec()));
       this.maxElems = debugOptions.getUnboundedReaderMaxElements();
       this.backoffFactory =
           FluentBackoff.DEFAULT
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index 261567930fe..d451ec093f7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -598,7 +598,6 @@ public class WorkerCustomSourcesTest {
     int maxElements = 10;
     DataflowPipelineDebugOptions debugOptions = 
options.as(DataflowPipelineDebugOptions.class);
     debugOptions.setUnboundedReaderMaxElements(maxElements);
-    debugOptions.setUnboundedReaderMaxReadTimeSec(10);
 
     ByteString state = ByteString.EMPTY;
     for (int i = 0; i < 10 * maxElements;
@@ -646,10 +645,10 @@ public class WorkerCustomSourcesTest {
         numReadOnThisIteration++;
       }
       Instant afterReading = Instant.now();
-      double maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec();
+      long maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec();
       assertThat(
-          new Duration(beforeReading, afterReading).getMillis(),
-          lessThanOrEqualTo((long) ((maxReadSec + 1) * 1000L)));
+          new Duration(beforeReading, afterReading).getStandardSeconds(),
+          lessThanOrEqualTo(maxReadSec + 1));
       assertThat(
           numReadOnThisIteration, 
lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements()));
 

Reply via email to