Hi,

I am looking into:
https://beam.apache.org/documentation/patterns/file-processing/ since I
would like to create a continuous pipeline that reads from files and
assigns Event Times based on e.g. file metadata or actual data inside the
file. For example:

private static void run(String[] args) {
  PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
  Pipeline pipeline = Pipeline.create(options);

  PCollection<Metadata> matches = pipeline
      .apply(FileIO.match()
          .filepattern("/tmp/input/*")
          .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
  matches
      .apply(ParDo.of(new ReadFileFn()))

  pipeline.run();
}

private static final class ReadFileFn extends DoFn<Metadata, String> {
  private static final Logger logger =
LoggerFactory.getLogger(ReadFileFn.class);

  @ProcessElement
  public void processElement(ProcessContext c) throws IOException {
    Metadata metadata = c.element();
    // I believe c.timestamp() is based on processing time.
    logger.info("reading {} @ {}", metadata, c.timestamp());
    String filename = metadata.resourceId().toString();
    // Output timestamps must be no earlier than the timestamp of the
    // current input minus the allowed skew (0 milliseconds).
    Instant timestamp = new Instant(metadata.lastModifiedMillis());
    logger.info("lastModified @ {}", timestamp);
    try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
      String line;
      while ((line = br.readLine()) != null) {
        c.outputWithTimestamp(line, c.timestamp());
      }
    }
  }
}

The issue is that when calling c.outputWithTimestamp() I am getting:

Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp
1970-01-01T00:00:00.000Z. Output timestamps must be no earlier than the
timestamp of the current input (2020-10-08T20:39:44.286Z) minus the allowed
skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for
details on changing the allowed skew.

I believe this is because MatchPollFn.apply() uses Instant.now() as the
event time for the PCollection<Metadata>. I can see that the call to
continuously() makes the PCollection unbounded and assigns default Event
Time. Without the call to continuously() I can assign the timestamps
without problems either via c.outputWithTimestamp or WithTimestamp
transform.

I would like to know what is the way to fix the issue, and whether this
use-case is currently supported in Beam.

-- 
Best regards,
Piotr

Reply via email to