Thank you for a quick response. I tried to follow the doc attached and read
existing Beam code that uses the Splittable DoFns, and I made some progress.

I created a simple pipeline that matches given filepattern, and uses
splittable dofn to control event times and watermarks. The pipeline expects
files with the following name patterns:

*yyyy-MM-dd*

*yyyy-MM-dd.complete*

Every time it sees *yyyy-MM-dd*, it reads its contents and outputs lines of
the file using *outputWithTimestamp(..., timestamp)*. Additionally, it
calls *watermarkEstimator.setWatermark(timestamp)*. In both cases the
timestamp is *yyyy-MM-ddT00:00:00.000Z*.

Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it calls
*watermarkEstimator.setWatermark(timestamp)*, where timestamp is
*yyyy-MM-ddT00:00:00.000Z
plus one day* - hence it advances to the next day.

I am at the point when the following unit test fails the inWindow()
assertions, the last assertion passes. It seems that even though I
call watermarkEstimator.setWatermark() the window is not being closed.

I would appreciate help/suggestions on what I am missing.

Here is a unit test. The function being tested is getData() defined below.

public void testGetDataWithNewFiles() throws InterruptedException {
  final Duration duration = Duration.standardDays(1);

  IntervalWindow firstWindow =
      new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
  logger.info("first window {}", firstWindow);
  IntervalWindow secondWindow =
      new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
  logger.info("second window {}", secondWindow);

  MatchConfiguration matchConfiguration =
      MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
          .continuously(
              Duration.millis(100),
              
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));

  PCollection<KV<String, Long>> output =
      FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath()
+ "/*", matchConfiguration)
          .apply("Window", Window.into(FixedWindows.of(duration)))
          .apply("LogWindowedResult", Log.ofElements("testGetData"));

  assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());

  Thread writer =
      new Thread(
          () -> {
            try {
              Thread.sleep(1000);

              Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
              Files.write(firstPath, Arrays.asList("1", "2", "3"));

              Thread.sleep(1000);

              Path firstPathComplete =
tmpFolder.newFile("2020-01-01.complete").toPath();
              Files.write(firstPathComplete, Arrays.asList());

              Thread.sleep(1000);

              Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
              Files.write(secondPath, Arrays.asList("4", "5", "6"));

              Thread.sleep(1000);

              Path secondPathComplete =
tmpFolder.newFile("2020-01-02.complete").toPath();
              Files.write(secondPathComplete, Arrays.asList());

            } catch (IOException | InterruptedException e) {
              throw new RuntimeException(e);
            }
          });
  writer.start();

  // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
  PAssert.that(output)
      .inWindow(firstWindow)
      .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L),
KV.of("my-key", 3L));

  // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
  PAssert.that(output)
      .inWindow(secondWindow)
      .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L),
KV.of("my-key", 6L));

  // THIS ASSERTION PASSES.
  PAssert.that(output)
      .containsInAnyOrder(
          KV.of("my-key", 1L),
          KV.of("my-key", 2L),
          KV.of("my-key", 3L),
          KV.of("my-key", 4L),
          KV.of("my-key", 5L),
          KV.of("my-key", 6L));

  p.run();

  writer.join();
}

Here is the code. Essentially, I am using *FileIO.match()* to match
filepattern. Then the file *Metadata* is processed by my custom Splittable
DoFn.

static PCollection<KV<String, Long>> getData(
    Pipeline pipeline, String filepattern, MatchConfiguration
matchConfiguration) {
  PCollection<Metadata> matches =
      pipeline.apply(
          
FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
  return matches.apply(ParDo.of(new
ReadFileFn())).apply(Log.ofElements("Get Data"));
}

/**
 * Processes matched files by outputting key-value pairs where key is
equal to "my-key" and values
 * are Long values corresponding to the lines in the file. In the case
file does not contain one
 * Long per line, IOException is thrown.
 */
@DoFn.BoundedPerElement
private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
  private static final Logger logger =
LoggerFactory.getLogger(ReadFileFn.class);

  @ProcessElement
  public void processElement(
      ProcessContext c,
      RestrictionTracker<OffsetRange, Long> tracker,
      ManualWatermarkEstimator<Instant> watermarkEstimator)
      throws IOException {
    Metadata metadata = c.element();
    logger.info(
        "reading {} with restriction {} @ {}",
        metadata,
        tracker.currentRestriction(),
        c.timestamp());
    String filename = metadata.resourceId().toString();
    Instant timestamp = getTimestamp(filename);
    try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
      String line;
      for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
        if (lineNumber < tracker.currentRestriction().getFrom()
            || lineNumber >= tracker.currentRestriction().getTo()) {
          continue;
        }
        if (!tracker.tryClaim(lineNumber)) {
          logger.info("failed to claim {}", lineNumber);
          return;
        }
        c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
      }
    }
    logger.info("setting watermark to {}", timestamp);
    watermarkEstimator.setWatermark(timestamp);
    logger.info("Finish processing {} in file {}",
tracker.currentRestriction(), filename);
  }

  private Instant getTimestamp(String filepath) {
    // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
    String filename = Paths.get(filepath).getFileName().toString();
    int index = filename.lastIndexOf(".complete");
    if (index != -1) {
      // In the case it has a suffix, strip it.
      filename = filename.substring(0, index);
    }
    Instant timestamp =
        Instant.parse(new
StringBuilder().append(filename).append("T00:00:00.000Z").toString());
    if (index != -1) {
      // In the case it has a suffix i.e. it is complete, fast forward
to the next day.
      return timestamp.plus(Duration.standardDays(1));
    }
    return timestamp;
  }

  @GetInitialRestriction
  public OffsetRange getInitialRestriction(@Element Metadata metadata)
throws IOException {
    long lineCount;
    try (Stream<String> stream =
Files.lines(Paths.get(metadata.resourceId().toString()))) {
      lineCount = stream.count();
    }
    return new OffsetRange(0L, lineCount);
  }

  @GetInitialWatermarkEstimatorState
  public Instant getInitialWatermarkEstimatorState(
      @Element Metadata metadata, @Restriction OffsetRange restriction) {
    String filename = metadata.resourceId().toString();
    logger.info("getInitialWatermarkEstimatorState {}", filename);
    // Compute and return the initial watermark estimator state for
each element and restriction.
    // All subsequent processing of an element and restriction will be
restored from the existing
    // state.
    return getTimestamp(filename);
  }

  private static Instant ensureTimestampWithinBounds(Instant timestamp) {
    if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
      timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
    } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
      timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
    }
    return timestamp;
  }

  @NewWatermarkEstimator
  public WatermarkEstimators.Manual newWatermarkEstimator(
      @WatermarkEstimatorState Instant watermarkEstimatorState) {
    logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
    return new 
WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
  }
}



On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <[email protected]> wrote:

> I'm working on a blog post[1] about splittable dofns that covers this
> topic.
>
> The TLDR; is that FileIO.match() should allow users to control the
> watermark estimator that is used and for your use case you should hold the
> watermark to some computable value (e.g. the files are generated every hour
> so once you know the last file has appeared for that hour you advance the
> watermark to the current hour).
>
> 1:
> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>
> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <[email protected]>
> wrote:
>
>> Hi,
>>
>> I am looking into:
>> https://beam.apache.org/documentation/patterns/file-processing/ since I
>> would like to create a continuous pipeline that reads from files and
>> assigns Event Times based on e.g. file metadata or actual data inside the
>> file. For example:
>>
>> private static void run(String[] args) {
>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>   Pipeline pipeline = Pipeline.create(options);
>>
>>   PCollection<Metadata> matches = pipeline
>>       .apply(FileIO.match()
>>           .filepattern("/tmp/input/*")
>>           .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>>   matches
>>       .apply(ParDo.of(new ReadFileFn()))
>>
>>   pipeline.run();
>> }
>>
>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>   private static final Logger logger = 
>> LoggerFactory.getLogger(ReadFileFn.class);
>>
>>   @ProcessElement
>>   public void processElement(ProcessContext c) throws IOException {
>>     Metadata metadata = c.element();
>>     // I believe c.timestamp() is based on processing time.
>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>     String filename = metadata.resourceId().toString();
>>     // Output timestamps must be no earlier than the timestamp of the
>>     // current input minus the allowed skew (0 milliseconds).
>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>     logger.info("lastModified @ {}", timestamp);
>>     try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>>       String line;
>>       while ((line = br.readLine()) != null) {
>>         c.outputWithTimestamp(line, c.timestamp());
>>       }
>>     }
>>   }
>> }
>>
>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>
>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>> Javadoc for details on changing the allowed skew.
>>
>> I believe this is because MatchPollFn.apply() uses Instant.now() as the
>> event time for the PCollection<Metadata>. I can see that the call to
>> continuously() makes the PCollection unbounded and assigns default Event
>> Time. Without the call to continuously() I can assign the timestamps
>> without problems either via c.outputWithTimestamp or WithTimestamp
>> transform.
>>
>> I would like to know what is the way to fix the issue, and whether this
>> use-case is currently supported in Beam.
>>
>> --
>> Best regards,
>> Piotr
>>
>

-- 
Best regards,
Piotr

Reply via email to