FileIO.match doesn't allow one to configure how the watermark advances and
it assumes that the watermark during polling is always the current system
time[1].

Because of this the downstream watermark advancement is limited. When an
element and restriction starts processing, the maximum you can hold the
output watermark back by for this element and restriction pair is limited
to the current input watermark (a common value to use is the current
element's timestamp as the lower bound for all future output but if that
element is late the output you produce may or may not be late (depends on
downstream windowing strategy)). Holding this watermark back is important
since many of these elements and restrictions could be processed in
parallel at different rates.

Based upon your implementation, you wouldn't need to control the watermark
from the file reading splittable DoFn if FileIO.match allowed you to say
what the watermark is after each polling round and allowed you to set the
timestamp for each match found. This initial setting of the watermark
during polling would be properly handled by the runner to block watermark
advancement for those elements.

Minor comments not related to your issue but would improve your
implementation:
1) Typically you set the watermark right before returning. You are missing
this from the failed tryClaim loop return.
2) You should structure your loop not based upon the end of the current
restriction but continue processing till tryClaim fails. For example:
      @ProcessElement
      public void processElement(@Element String fileName,
RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
outputReceiver) throws IOException {
        RandomAccessFile file = new RandomAccessFile(fileName, "r");
        seekToNextRecordBoundaryInFile(file,
tracker.currentRestriction().getFrom());
        while (tracker.tryClaim(file.getFilePointer())) {
          outputReceiver.output(readNextRecord(file));
        }
      }
3) ensureTimestampWithinBounds is dangerous as you're masking a possible
data issue since the code either parsed some filename incorrectly. It is
likely that you copied this from Beam code and it is used there because
user implementations of UnboundedSource were incorrectly setting the
watermark outside of the bounds and there is no way to fix them.

1:
https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666

On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <[email protected]>
wrote:

> Thank you for a quick response. I tried to follow the doc attached and
> read existing Beam code that uses the Splittable DoFns, and I made some
> progress.
>
> I created a simple pipeline that matches given filepattern, and uses
> splittable dofn to control event times and watermarks. The pipeline expects
> files with the following name patterns:
>
> *yyyy-MM-dd*
>
> *yyyy-MM-dd.complete*
>
> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs lines
> of the file using *outputWithTimestamp(..., timestamp)*. Additionally, it
> calls *watermarkEstimator.setWatermark(timestamp)*. In both cases the
> timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>
> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it calls
> *watermarkEstimator.setWatermark(timestamp)*, where timestamp is 
> *yyyy-MM-ddT00:00:00.000Z
> plus one day* - hence it advances to the next day.
>
> I am at the point when the following unit test fails the inWindow()
> assertions, the last assertion passes. It seems that even though I
> call watermarkEstimator.setWatermark() the window is not being closed.
>
> I would appreciate help/suggestions on what I am missing.
>
> Here is a unit test. The function being tested is getData() defined below.
>
> public void testGetDataWithNewFiles() throws InterruptedException {
>   final Duration duration = Duration.standardDays(1);
>
>   IntervalWindow firstWindow =
>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
>   logger.info("first window {}", firstWindow);
>   IntervalWindow secondWindow =
>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
>   logger.info("second window {}", secondWindow);
>
>   MatchConfiguration matchConfiguration =
>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>           .continuously(
>               Duration.millis(100),
>               
> Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>
>   PCollection<KV<String, Long>> output =
>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() + "/*", 
> matchConfiguration)
>           .apply("Window", Window.into(FixedWindows.of(duration)))
>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>
>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>
>   Thread writer =
>       new Thread(
>           () -> {
>             try {
>               Thread.sleep(1000);
>
>               Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>
>               Thread.sleep(1000);
>
>               Path firstPathComplete = 
> tmpFolder.newFile("2020-01-01.complete").toPath();
>               Files.write(firstPathComplete, Arrays.asList());
>
>               Thread.sleep(1000);
>
>               Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>
>               Thread.sleep(1000);
>
>               Path secondPathComplete = 
> tmpFolder.newFile("2020-01-02.complete").toPath();
>               Files.write(secondPathComplete, Arrays.asList());
>
>             } catch (IOException | InterruptedException e) {
>               throw new RuntimeException(e);
>             }
>           });
>   writer.start();
>
>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>   PAssert.that(output)
>       .inWindow(firstWindow)
>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), 
> KV.of("my-key", 3L));
>
>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>   PAssert.that(output)
>       .inWindow(secondWindow)
>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), 
> KV.of("my-key", 6L));
>
>   // THIS ASSERTION PASSES.
>   PAssert.that(output)
>       .containsInAnyOrder(
>           KV.of("my-key", 1L),
>           KV.of("my-key", 2L),
>           KV.of("my-key", 3L),
>           KV.of("my-key", 4L),
>           KV.of("my-key", 5L),
>           KV.of("my-key", 6L));
>
>   p.run();
>
>   writer.join();
> }
>
> Here is the code. Essentially, I am using *FileIO.match()* to match
> filepattern. Then the file *Metadata* is processed by my custom
> Splittable DoFn.
>
> static PCollection<KV<String, Long>> getData(
>     Pipeline pipeline, String filepattern, MatchConfiguration 
> matchConfiguration) {
>   PCollection<Metadata> matches =
>       pipeline.apply(
>           
> FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>   return matches.apply(ParDo.of(new ReadFileFn())).apply(Log.ofElements("Get 
> Data"));
> }
>
> /**
>  * Processes matched files by outputting key-value pairs where key is equal 
> to "my-key" and values
>  * are Long values corresponding to the lines in the file. In the case file 
> does not contain one
>  * Long per line, IOException is thrown.
>  */
> @DoFn.BoundedPerElement
> private static final class ReadFileFn extends DoFn<Metadata, KV<String, 
> Long>> {
>   private static final Logger logger = 
> LoggerFactory.getLogger(ReadFileFn.class);
>
>   @ProcessElement
>   public void processElement(
>       ProcessContext c,
>       RestrictionTracker<OffsetRange, Long> tracker,
>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>       throws IOException {
>     Metadata metadata = c.element();
>     logger.info(
>         "reading {} with restriction {} @ {}",
>         metadata,
>         tracker.currentRestriction(),
>         c.timestamp());
>     String filename = metadata.resourceId().toString();
>     Instant timestamp = getTimestamp(filename);
>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>       String line;
>       for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) 
> {
>         if (lineNumber < tracker.currentRestriction().getFrom()
>             || lineNumber >= tracker.currentRestriction().getTo()) {
>           continue;
>         }
>         if (!tracker.tryClaim(lineNumber)) {
>           logger.info("failed to claim {}", lineNumber);
>           return;
>         }
>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), 
> timestamp);
>       }
>     }
>     logger.info("setting watermark to {}", timestamp);
>     watermarkEstimator.setWatermark(timestamp);
>     logger.info("Finish processing {} in file {}", 
> tracker.currentRestriction(), filename);
>   }
>
>   private Instant getTimestamp(String filepath) {
>     // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
>     String filename = Paths.get(filepath).getFileName().toString();
>     int index = filename.lastIndexOf(".complete");
>     if (index != -1) {
>       // In the case it has a suffix, strip it.
>       filename = filename.substring(0, index);
>     }
>     Instant timestamp =
>         Instant.parse(new 
> StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>     if (index != -1) {
>       // In the case it has a suffix i.e. it is complete, fast forward to the 
> next day.
>       return timestamp.plus(Duration.standardDays(1));
>     }
>     return timestamp;
>   }
>
>   @GetInitialRestriction
>   public OffsetRange getInitialRestriction(@Element Metadata metadata) throws 
> IOException {
>     long lineCount;
>     try (Stream<String> stream = 
> Files.lines(Paths.get(metadata.resourceId().toString()))) {
>       lineCount = stream.count();
>     }
>     return new OffsetRange(0L, lineCount);
>   }
>
>   @GetInitialWatermarkEstimatorState
>   public Instant getInitialWatermarkEstimatorState(
>       @Element Metadata metadata, @Restriction OffsetRange restriction) {
>     String filename = metadata.resourceId().toString();
>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>     // Compute and return the initial watermark estimator state for each 
> element and restriction.
>     // All subsequent processing of an element and restriction will be 
> restored from the existing
>     // state.
>     return getTimestamp(filename);
>   }
>
>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>     }
>     return timestamp;
>   }
>
>   @NewWatermarkEstimator
>   public WatermarkEstimators.Manual newWatermarkEstimator(
>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>     return new 
> WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>   }
> }
>
>
>
> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <[email protected]> wrote:
>
>> I'm working on a blog post[1] about splittable dofns that covers this
>> topic.
>>
>> The TLDR; is that FileIO.match() should allow users to control the
>> watermark estimator that is used and for your use case you should hold the
>> watermark to some computable value (e.g. the files are generated every hour
>> so once you know the last file has appeared for that hour you advance the
>> watermark to the current hour).
>>
>> 1:
>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>
>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <[email protected]>
>> wrote:
>>
>>> Hi,
>>>
>>> I am looking into:
>>> https://beam.apache.org/documentation/patterns/file-processing/ since I
>>> would like to create a continuous pipeline that reads from files and
>>> assigns Event Times based on e.g. file metadata or actual data inside the
>>> file. For example:
>>>
>>> private static void run(String[] args) {
>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>   Pipeline pipeline = Pipeline.create(options);
>>>
>>>   PCollection<Metadata> matches = pipeline
>>>       .apply(FileIO.match()
>>>           .filepattern("/tmp/input/*")
>>>           .continuously(Duration.standardSeconds(15), 
>>> Watch.Growth.never()));
>>>   matches
>>>       .apply(ParDo.of(new ReadFileFn()))
>>>
>>>   pipeline.run();
>>> }
>>>
>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>   private static final Logger logger = 
>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>
>>>   @ProcessElement
>>>   public void processElement(ProcessContext c) throws IOException {
>>>     Metadata metadata = c.element();
>>>     // I believe c.timestamp() is based on processing time.
>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>     String filename = metadata.resourceId().toString();
>>>     // Output timestamps must be no earlier than the timestamp of the
>>>     // current input minus the allowed skew (0 milliseconds).
>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>     logger.info("lastModified @ {}", timestamp);
>>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>>       String line;
>>>       while ((line = br.readLine()) != null) {
>>>         c.outputWithTimestamp(line, c.timestamp());
>>>       }
>>>     }
>>>   }
>>> }
>>>
>>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>>
>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>>> Javadoc for details on changing the allowed skew.
>>>
>>> I believe this is because MatchPollFn.apply() uses Instant.now() as the
>>> event time for the PCollection<Metadata>. I can see that the call to
>>> continuously() makes the PCollection unbounded and assigns default
>>> Event Time. Without the call to continuously() I can assign the timestamps
>>> without problems either via c.outputWithTimestamp or WithTimestamp
>>> transform.
>>>
>>> I would like to know what is the way to fix the issue, and whether this
>>> use-case is currently supported in Beam.
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>
> --
> Best regards,
> Piotr
>

Reply via email to