Hi,
I am looking into:
https://beam.apache.org/documentation/patterns/file-processing/ since I
would like to create a continuous pipeline that reads from files and
assigns Event Times based on e.g. file metadata or actual data inside the
file. For example:
private static void run(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
PCollection<Metadata> matches = pipeline
.apply(FileIO.match()
.filepattern("/tmp/input/*")
.continuously(Duration.standardSeconds(15), Watch.Growth.never()));
matches
.apply(ParDo.of(new ReadFileFn()))
pipeline.run();
}
private static final class ReadFileFn extends DoFn<Metadata, String> {
private static final Logger logger =
LoggerFactory.getLogger(ReadFileFn.class);
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
Metadata metadata = c.element();
// I believe c.timestamp() is based on processing time.
logger.info("reading {} @ {}", metadata, c.timestamp());
String filename = metadata.resourceId().toString();
// Output timestamps must be no earlier than the timestamp of the
// current input minus the allowed skew (0 milliseconds).
Instant timestamp = new Instant(metadata.lastModifiedMillis());
logger.info("lastModified @ {}", timestamp);
try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
String line;
while ((line = br.readLine()) != null) {
c.outputWithTimestamp(line, c.timestamp());
}
}
}
}
The issue is that when calling c.outputWithTimestamp() I am getting:
Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp
1970-01-01T00:00:00.000Z. Output timestamps must be no earlier than the
timestamp of the current input (2020-10-08T20:39:44.286Z) minus the allowed
skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for
details on changing the allowed skew.
I believe this is because MatchPollFn.apply() uses Instant.now() as the
event time for the PCollection<Metadata>. I can see that the call to
continuously() makes the PCollection unbounded and assigns default Event
Time. Without the call to continuously() I can assign the timestamps
without problems either via c.outputWithTimestamp or WithTimestamp
transform.
I would like to know what is the way to fix the issue, and whether this
use-case is currently supported in Beam.
--
Best regards,
Piotr