I'm working on a blog post[1] about splittable dofns that covers this topic.
The TLDR; is that FileIO.match() should allow users to control the watermark estimator that is used and for your use case you should hold the watermark to some computable value (e.g. the files are generated every hour so once you know the last file has appeared for that hour you advance the watermark to the current hour). 1: https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <[email protected]> wrote: > Hi, > > I am looking into: > https://beam.apache.org/documentation/patterns/file-processing/ since I > would like to create a continuous pipeline that reads from files and > assigns Event Times based on e.g. file metadata or actual data inside the > file. For example: > > private static void run(String[] args) { > PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); > Pipeline pipeline = Pipeline.create(options); > > PCollection<Metadata> matches = pipeline > .apply(FileIO.match() > .filepattern("/tmp/input/*") > .continuously(Duration.standardSeconds(15), Watch.Growth.never())); > matches > .apply(ParDo.of(new ReadFileFn())) > > pipeline.run(); > } > > private static final class ReadFileFn extends DoFn<Metadata, String> { > private static final Logger logger = > LoggerFactory.getLogger(ReadFileFn.class); > > @ProcessElement > public void processElement(ProcessContext c) throws IOException { > Metadata metadata = c.element(); > // I believe c.timestamp() is based on processing time. > logger.info("reading {} @ {}", metadata, c.timestamp()); > String filename = metadata.resourceId().toString(); > // Output timestamps must be no earlier than the timestamp of the > // current input minus the allowed skew (0 milliseconds). > Instant timestamp = new Instant(metadata.lastModifiedMillis()); > logger.info("lastModified @ {}", timestamp); > try (BufferedReader br = new BufferedReader(new FileReader(filename))) { > String line; > while ((line = br.readLine()) != null) { > c.outputWithTimestamp(line, c.timestamp()); > } > } > } > } > > The issue is that when calling c.outputWithTimestamp() I am getting: > > Caused by: java.lang.IllegalArgumentException: Cannot output with > timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier > than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus > the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() > Javadoc for details on changing the allowed skew. > > I believe this is because MatchPollFn.apply() uses Instant.now() as the > event time for the PCollection<Metadata>. I can see that the call to > continuously() makes the PCollection unbounded and assigns default Event > Time. Without the call to continuously() I can assign the timestamps > without problems either via c.outputWithTimestamp or WithTimestamp > transform. > > I would like to know what is the way to fix the issue, and whether this > use-case is currently supported in Beam. > > -- > Best regards, > Piotr >
