This is an automated email from the ASF dual-hosted git repository. jrmccluskey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new bb310e7e907 Change type for UnboundedReaderMaxReadTimeSec (#31037) bb310e7e907 is described below commit bb310e7e90720b620f1089574f1656ca84a3656d Author: Radosław Stankiewicz <radosl...@google.com> AuthorDate: Thu Apr 18 21:50:05 2024 +0200 Change type for UnboundedReaderMaxReadTimeSec (#31037) * add ms part for UnboundedReader checkpointing * typo * spotless * spotless * spotless * [IntLongMath] Expression of type int may overflow before being assigned to a long * readerMaxReadTime sec as double * readerMaxReadTime sec as double * readerMaxReadTime sec as double * readerMaxReadTime sec as double * spotless --- .../runners/dataflow/options/DataflowPipelineDebugOptions.java | 9 +++++---- .../apache/beam/runners/dataflow/worker/WorkerCustomSources.java | 4 +++- .../beam/runners/dataflow/worker/WorkerCustomSourcesTest.java | 7 ++++--- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java index 30496dec296..3f6c47ece68 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java @@ -218,11 +218,12 @@ public interface DataflowPipelineDebugOptions /** The max amount of time an UnboundedReader is consumed before checkpointing. */ @Description( - "The max amount of time before an UnboundedReader is consumed before checkpointing, in seconds.") - @Default.Integer(10) - Integer getUnboundedReaderMaxReadTimeSec(); + "The max amount of time before an UnboundedReader is consumed before checkpointing, " + + "in seconds. Duration can be set to fractions of seconds. ") + @Default.Double(10.0) + double getUnboundedReaderMaxReadTimeSec(); - void setUnboundedReaderMaxReadTimeSec(Integer value); + void setUnboundedReaderMaxReadTimeSec(double value); /** The max elements read from an UnboundedReader before checkpointing. */ @Description("The max elements read from an UnboundedReader before checkpointing. ") diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java index 8c086016ee9..a8e358f19e0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSources.java @@ -798,7 +798,9 @@ public class WorkerCustomSources { DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); this.endTime = Instant.now() - .plus(Duration.standardSeconds(debugOptions.getUnboundedReaderMaxReadTimeSec())); + .plus( + Duration.millis( + (long) (debugOptions.getUnboundedReaderMaxReadTimeSec() * 1000L))); this.maxElems = debugOptions.getUnboundedReaderMaxElements(); this.backoffFactory = FluentBackoff.DEFAULT diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index d451ec093f7..261567930fe 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -598,6 +598,7 @@ public class WorkerCustomSourcesTest { int maxElements = 10; DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); debugOptions.setUnboundedReaderMaxElements(maxElements); + debugOptions.setUnboundedReaderMaxReadTimeSec(10); ByteString state = ByteString.EMPTY; for (int i = 0; i < 10 * maxElements; @@ -645,10 +646,10 @@ public class WorkerCustomSourcesTest { numReadOnThisIteration++; } Instant afterReading = Instant.now(); - long maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec(); + double maxReadSec = debugOptions.getUnboundedReaderMaxReadTimeSec(); assertThat( - new Duration(beforeReading, afterReading).getStandardSeconds(), - lessThanOrEqualTo(maxReadSec + 1)); + new Duration(beforeReading, afterReading).getMillis(), + lessThanOrEqualTo((long) ((maxReadSec + 1) * 1000L))); assertThat( numReadOnThisIteration, lessThanOrEqualTo(debugOptions.getUnboundedReaderMaxElements()));