Thank you for a quick response. I tried to follow the doc attached and read
existing Beam code that uses the Splittable DoFns, and I made some progress.
I created a simple pipeline that matches given filepattern, and uses
splittable dofn to control event times and watermarks. The pipeline expects
files with the following name patterns:
*yyyy-MM-dd*
*yyyy-MM-dd.complete*
Every time it sees *yyyy-MM-dd*, it reads its contents and outputs lines of
the file using *outputWithTimestamp(..., timestamp)*. Additionally, it
calls *watermarkEstimator.setWatermark(timestamp)*. In both cases the
timestamp is *yyyy-MM-ddT00:00:00.000Z*.
Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it calls
*watermarkEstimator.setWatermark(timestamp)*, where timestamp is
*yyyy-MM-ddT00:00:00.000Z
plus one day* - hence it advances to the next day.
I am at the point when the following unit test fails the inWindow()
assertions, the last assertion passes. It seems that even though I
call watermarkEstimator.setWatermark() the window is not being closed.
I would appreciate help/suggestions on what I am missing.
Here is a unit test. The function being tested is getData() defined below.
public void testGetDataWithNewFiles() throws InterruptedException {
final Duration duration = Duration.standardDays(1);
IntervalWindow firstWindow =
new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), duration);
logger.info("first window {}", firstWindow);
IntervalWindow secondWindow =
new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), duration);
logger.info("second window {}", secondWindow);
MatchConfiguration matchConfiguration =
MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
.continuously(
Duration.millis(100),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
PCollection<KV<String, Long>> output =
FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath()
+ "/*", matchConfiguration)
.apply("Window", Window.into(FixedWindows.of(duration)))
.apply("LogWindowedResult", Log.ofElements("testGetData"));
assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
Thread writer =
new Thread(
() -> {
try {
Thread.sleep(1000);
Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
Files.write(firstPath, Arrays.asList("1", "2", "3"));
Thread.sleep(1000);
Path firstPathComplete =
tmpFolder.newFile("2020-01-01.complete").toPath();
Files.write(firstPathComplete, Arrays.asList());
Thread.sleep(1000);
Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
Files.write(secondPath, Arrays.asList("4", "5", "6"));
Thread.sleep(1000);
Path secondPathComplete =
tmpFolder.newFile("2020-01-02.complete").toPath();
Files.write(secondPathComplete, Arrays.asList());
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
});
writer.start();
// THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
PAssert.that(output)
.inWindow(firstWindow)
.containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L),
KV.of("my-key", 3L));
// THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
PAssert.that(output)
.inWindow(secondWindow)
.containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L),
KV.of("my-key", 6L));
// THIS ASSERTION PASSES.
PAssert.that(output)
.containsInAnyOrder(
KV.of("my-key", 1L),
KV.of("my-key", 2L),
KV.of("my-key", 3L),
KV.of("my-key", 4L),
KV.of("my-key", 5L),
KV.of("my-key", 6L));
p.run();
writer.join();
}
Here is the code. Essentially, I am using *FileIO.match()* to match
filepattern. Then the file *Metadata* is processed by my custom Splittable
DoFn.
static PCollection<KV<String, Long>> getData(
Pipeline pipeline, String filepattern, MatchConfiguration
matchConfiguration) {
PCollection<Metadata> matches =
pipeline.apply(
FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
return matches.apply(ParDo.of(new
ReadFileFn())).apply(Log.ofElements("Get Data"));
}
/**
* Processes matched files by outputting key-value pairs where key is
equal to "my-key" and values
* are Long values corresponding to the lines in the file. In the case
file does not contain one
* Long per line, IOException is thrown.
*/
@DoFn.BoundedPerElement
private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
private static final Logger logger =
LoggerFactory.getLogger(ReadFileFn.class);
@ProcessElement
public void processElement(
ProcessContext c,
RestrictionTracker<OffsetRange, Long> tracker,
ManualWatermarkEstimator<Instant> watermarkEstimator)
throws IOException {
Metadata metadata = c.element();
logger.info(
"reading {} with restriction {} @ {}",
metadata,
tracker.currentRestriction(),
c.timestamp());
String filename = metadata.resourceId().toString();
Instant timestamp = getTimestamp(filename);
try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
String line;
for (long lineNumber = 0; (line = br.readLine()) != null; ++lineNumber) {
if (lineNumber < tracker.currentRestriction().getFrom()
|| lineNumber >= tracker.currentRestriction().getTo()) {
continue;
}
if (!tracker.tryClaim(lineNumber)) {
logger.info("failed to claim {}", lineNumber);
return;
}
c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), timestamp);
}
}
logger.info("setting watermark to {}", timestamp);
watermarkEstimator.setWatermark(timestamp);
logger.info("Finish processing {} in file {}",
tracker.currentRestriction(), filename);
}
private Instant getTimestamp(String filepath) {
// Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
String filename = Paths.get(filepath).getFileName().toString();
int index = filename.lastIndexOf(".complete");
if (index != -1) {
// In the case it has a suffix, strip it.
filename = filename.substring(0, index);
}
Instant timestamp =
Instant.parse(new
StringBuilder().append(filename).append("T00:00:00.000Z").toString());
if (index != -1) {
// In the case it has a suffix i.e. it is complete, fast forward
to the next day.
return timestamp.plus(Duration.standardDays(1));
}
return timestamp;
}
@GetInitialRestriction
public OffsetRange getInitialRestriction(@Element Metadata metadata)
throws IOException {
long lineCount;
try (Stream<String> stream =
Files.lines(Paths.get(metadata.resourceId().toString()))) {
lineCount = stream.count();
}
return new OffsetRange(0L, lineCount);
}
@GetInitialWatermarkEstimatorState
public Instant getInitialWatermarkEstimatorState(
@Element Metadata metadata, @Restriction OffsetRange restriction) {
String filename = metadata.resourceId().toString();
logger.info("getInitialWatermarkEstimatorState {}", filename);
// Compute and return the initial watermark estimator state for
each element and restriction.
// All subsequent processing of an element and restriction will be
restored from the existing
// state.
return getTimestamp(filename);
}
private static Instant ensureTimestampWithinBounds(Instant timestamp) {
if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
} else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
}
return timestamp;
}
@NewWatermarkEstimator
public WatermarkEstimators.Manual newWatermarkEstimator(
@WatermarkEstimatorState Instant watermarkEstimatorState) {
logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
return new
WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
}
}
On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <[email protected]> wrote:
> I'm working on a blog post[1] about splittable dofns that covers this
> topic.
>
> The TLDR; is that FileIO.match() should allow users to control the
> watermark estimator that is used and for your use case you should hold the
> watermark to some computable value (e.g. the files are generated every hour
> so once you know the last file has appeared for that hour you advance the
> watermark to the current hour).
>
> 1:
> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>
> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <[email protected]>
> wrote:
>
>> Hi,
>>
>> I am looking into:
>> https://beam.apache.org/documentation/patterns/file-processing/ since I
>> would like to create a continuous pipeline that reads from files and
>> assigns Event Times based on e.g. file metadata or actual data inside the
>> file. For example:
>>
>> private static void run(String[] args) {
>> PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>> Pipeline pipeline = Pipeline.create(options);
>>
>> PCollection<Metadata> matches = pipeline
>> .apply(FileIO.match()
>> .filepattern("/tmp/input/*")
>> .continuously(Duration.standardSeconds(15), Watch.Growth.never()));
>> matches
>> .apply(ParDo.of(new ReadFileFn()))
>>
>> pipeline.run();
>> }
>>
>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>> private static final Logger logger =
>> LoggerFactory.getLogger(ReadFileFn.class);
>>
>> @ProcessElement
>> public void processElement(ProcessContext c) throws IOException {
>> Metadata metadata = c.element();
>> // I believe c.timestamp() is based on processing time.
>> logger.info("reading {} @ {}", metadata, c.timestamp());
>> String filename = metadata.resourceId().toString();
>> // Output timestamps must be no earlier than the timestamp of the
>> // current input minus the allowed skew (0 milliseconds).
>> Instant timestamp = new Instant(metadata.lastModifiedMillis());
>> logger.info("lastModified @ {}", timestamp);
>> try (BufferedReader br = new BufferedReader(new FileReader(filename))) {
>> String line;
>> while ((line = br.readLine()) != null) {
>> c.outputWithTimestamp(line, c.timestamp());
>> }
>> }
>> }
>> }
>>
>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>
>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>> the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew()
>> Javadoc for details on changing the allowed skew.
>>
>> I believe this is because MatchPollFn.apply() uses Instant.now() as the
>> event time for the PCollection<Metadata>. I can see that the call to
>> continuously() makes the PCollection unbounded and assigns default Event
>> Time. Without the call to continuously() I can assign the timestamps
>> without problems either via c.outputWithTimestamp or WithTimestamp
>> transform.
>>
>> I would like to know what is the way to fix the issue, and whether this
>> use-case is currently supported in Beam.
>>
>> --
>> Best regards,
>> Piotr
>>
>
--
Best regards,
Piotr