Got it, thank you for the clarification. I tried to run the pipeline locally, with the following main (see full source code attached):
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
logger.info("running");
PCollection<KV<String, Long>> output =
FileProcessing.getData(
pipeline, "/tmp/input/*", Duration.standardSeconds(1),
Growth.never());
output
.apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
.apply("LogWindowed", Log.ofElements("testGetData"))
.apply(Sum.longsPerKey())
.apply(
"FormatResults",
MapElements.into(TypeDescriptors.strings())
.via((KV<String, Long> kv) -> String.format("{},{}",
kv.getKey(), kv.getValue())))
.apply("LogResults", Log.ofElements("results"))
.apply(
TextIO.write()
.to(Paths.get("/tmp/output/").resolve("Results").toString())
.withWindowedWrites()
.withNumShards(1));
pipeline.run();
}
Then I am generating files using:
for i in {01..30}; do echo "handling $i"; echo "1\n2\n3\n4" >
/tmp/input/1985-10-$i; sleep 2; touch /tmp/input/1985-10-$i.complete; sleep
2; done
I do not see any outputs being generated though. Can you elaborate why that
might be? I would suspect that once the watermark is set to day+1, the
results of the previous day should be finalized and hence the result for a
given window should be outputted.
On Wed, Oct 14, 2020 at 1:41 PM Luke Cwik <[email protected]> wrote:
> I think you should be using the largest "complete" timestamp from the
> metadata results and not be setting the watermark if you don't have one.
>
> On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk <[email protected]>
> wrote:
>
>> Thank you so much for the input, that was extremely helpful!
>>
>> I changed the pipeline from using FileIO.match() into using a custom
>> matching (very similar to the FileIO.match()) that looks as follows:
>>
>> static PCollection<KV<String, Long>> getData(
>> Pipeline pipeline,
>> String filepattern,
>> Duration pollInterval,
>> TerminationCondition terminationCondition) {
>> final Growth<String, MatchResult.Metadata, String>
>> stringMetadataStringGrowth =
>> Watch.growthOf(
>> Contextful.of(new MatchPollFn(), Requirements.empty()), new
>> ExtractFilenameFn())
>> .withPollInterval(pollInterval)
>> .withTerminationPerInput(terminationCondition);
>> return pipeline
>> .apply("Create filepattern", Create.<String>of(filepattern))
>> .apply("Continuously match filepatterns", stringMetadataStringGrowth)
>> .apply(Values.create())
>> .apply(ParDo.of(new ReadFileFn()));
>> }
>>
>> private static class MatchPollFn extends PollFn<String, Metadata> {
>> private static final Logger logger =
>> LoggerFactory.getLogger(MatchPollFn.class);
>>
>> @Override
>> public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element,
>> Context c)
>> throws Exception {
>> // Here we only have the filepattern i.e. element, and hence we do not
>> know what the timestamp
>> // and/or watermark should be. As a result, we output EPOCH as both the
>> timestamp and the
>> // watermark.
>> Instant instant = Instant.EPOCH;
>> return Watch.Growth.PollResult.incomplete(
>> instant, FileSystems.match(element,
>> EmptyMatchTreatment.ALLOW).metadata())
>> .withWatermark(instant);
>> }
>> }
>>
>> private static class ExtractFilenameFn implements
>> SerializableFunction<Metadata, String> {
>> @Override
>> public String apply(MatchResult.Metadata input) {
>> return input.resourceId().toString();
>> }
>> }
>>
>> The above together with fixing the bugs that Luke pointed out (Thank you
>> Luke!), makes the unit test pass.
>>
>> Thank you again!
>>
>> If you have any feedback for the current code, I would appreciate it. I
>> am especially interested whether setting event time and watermark in
>> *MatchPollFn* to *EPOCH* is a correct way to go.
>>
>>
>> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <[email protected]> wrote:
>>
>>> FYI this is a major limitation in FileIO.match's watermarking ability. I
>>> believe there is a JIRA issue about this, but nobody has ever worked on
>>> improving it.
>>>
>>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <[email protected]> wrote:
>>>
>>>> FileIO.match doesn't allow one to configure how the watermark advances
>>>> and it assumes that the watermark during polling is always the current
>>>> system time[1].
>>>>
>>>> Because of this the downstream watermark advancement is limited. When
>>>> an element and restriction starts processing, the maximum you can hold the
>>>> output watermark back by for this element and restriction pair is limited
>>>> to the current input watermark (a common value to use is the current
>>>> element's timestamp as the lower bound for all future output but if that
>>>> element is late the output you produce may or may not be late (depends on
>>>> downstream windowing strategy)). Holding this watermark back is important
>>>> since many of these elements and restrictions could be processed in
>>>> parallel at different rates.
>>>>
>>>> Based upon your implementation, you wouldn't need to control the
>>>> watermark from the file reading splittable DoFn if FileIO.match allowed you
>>>> to say what the watermark is after each polling round and allowed you to
>>>> set the timestamp for each match found. This initial setting of the
>>>> watermark during polling would be properly handled by the runner to block
>>>> watermark advancement for those elements.
>>>>
>>>> Minor comments not related to your issue but would improve your
>>>> implementation:
>>>> 1) Typically you set the watermark right before returning. You are
>>>> missing this from the failed tryClaim loop return.
>>>> 2) You should structure your loop not based upon the end of the current
>>>> restriction but continue processing till tryClaim fails. For example:
>>>> @ProcessElement
>>>> public void processElement(@Element String fileName,
>>>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>>>> outputReceiver) throws IOException {
>>>> RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>>> seekToNextRecordBoundaryInFile(file,
>>>> tracker.currentRestriction().getFrom());
>>>> while (tracker.tryClaim(file.getFilePointer())) {
>>>> outputReceiver.output(readNextRecord(file));
>>>> }
>>>> }
>>>> 3) ensureTimestampWithinBounds is dangerous as you're masking a
>>>> possible data issue since the code either parsed some filename incorrectly.
>>>> It is likely that you copied this from Beam code and it is used there
>>>> because user implementations of UnboundedSource were incorrectly setting
>>>> the watermark outside of the bounds and there is no way to fix them.
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>>>
>>>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <
>>>> [email protected]> wrote:
>>>>
>>>>> Thank you for a quick response. I tried to follow the doc attached and
>>>>> read existing Beam code that uses the Splittable DoFns, and I made some
>>>>> progress.
>>>>>
>>>>> I created a simple pipeline that matches given filepattern, and uses
>>>>> splittable dofn to control event times and watermarks. The pipeline
>>>>> expects
>>>>> files with the following name patterns:
>>>>>
>>>>> *yyyy-MM-dd*
>>>>>
>>>>> *yyyy-MM-dd.complete*
>>>>>
>>>>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs
>>>>> lines of the file using *outputWithTimestamp(..., timestamp)*.
>>>>> Additionally, it calls *watermarkEstimator.setWatermark(timestamp)*.
>>>>> In both cases the timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>>>
>>>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it
>>>>> calls *watermarkEstimator.setWatermark(timestamp)*, where timestamp
>>>>> is *yyyy-MM-ddT00:00:00.000Z plus one day* - hence it advances to the
>>>>> next day.
>>>>>
>>>>> I am at the point when the following unit test fails the inWindow()
>>>>> assertions, the last assertion passes. It seems that even though I
>>>>> call watermarkEstimator.setWatermark() the window is not being closed.
>>>>>
>>>>> I would appreciate help/suggestions on what I am missing.
>>>>>
>>>>> Here is a unit test. The function being tested is getData() defined
>>>>> below.
>>>>>
>>>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>>> final Duration duration = Duration.standardDays(1);
>>>>>
>>>>> IntervalWindow firstWindow =
>>>>> new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"),
>>>>> duration);
>>>>> logger.info("first window {}", firstWindow);
>>>>> IntervalWindow secondWindow =
>>>>> new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"),
>>>>> duration);
>>>>> logger.info("second window {}", secondWindow);
>>>>>
>>>>> MatchConfiguration matchConfiguration =
>>>>> MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>>> .continuously(
>>>>> Duration.millis(100),
>>>>>
>>>>> Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>>>
>>>>> PCollection<KV<String, Long>> output =
>>>>> FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() +
>>>>> "/*", matchConfiguration)
>>>>> .apply("Window", Window.into(FixedWindows.of(duration)))
>>>>> .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>>>
>>>>> assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>>>
>>>>> Thread writer =
>>>>> new Thread(
>>>>> () -> {
>>>>> try {
>>>>> Thread.sleep(1000);
>>>>>
>>>>> Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>>>>> Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>>>
>>>>> Thread.sleep(1000);
>>>>>
>>>>> Path firstPathComplete =
>>>>> tmpFolder.newFile("2020-01-01.complete").toPath();
>>>>> Files.write(firstPathComplete, Arrays.asList());
>>>>>
>>>>> Thread.sleep(1000);
>>>>>
>>>>> Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>>>>> Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>>>
>>>>> Thread.sleep(1000);
>>>>>
>>>>> Path secondPathComplete =
>>>>> tmpFolder.newFile("2020-01-02.complete").toPath();
>>>>> Files.write(secondPathComplete, Arrays.asList());
>>>>>
>>>>> } catch (IOException | InterruptedException e) {
>>>>> throw new RuntimeException(e);
>>>>> }
>>>>> });
>>>>> writer.start();
>>>>>
>>>>> // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>> PAssert.that(output)
>>>>> .inWindow(firstWindow)
>>>>> .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L),
>>>>> KV.of("my-key", 3L));
>>>>>
>>>>> // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>> PAssert.that(output)
>>>>> .inWindow(secondWindow)
>>>>> .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L),
>>>>> KV.of("my-key", 6L));
>>>>>
>>>>> // THIS ASSERTION PASSES.
>>>>> PAssert.that(output)
>>>>> .containsInAnyOrder(
>>>>> KV.of("my-key", 1L),
>>>>> KV.of("my-key", 2L),
>>>>> KV.of("my-key", 3L),
>>>>> KV.of("my-key", 4L),
>>>>> KV.of("my-key", 5L),
>>>>> KV.of("my-key", 6L));
>>>>>
>>>>> p.run();
>>>>>
>>>>> writer.join();
>>>>> }
>>>>>
>>>>> Here is the code. Essentially, I am using *FileIO.match()* to match
>>>>> filepattern. Then the file *Metadata* is processed by my custom
>>>>> Splittable DoFn.
>>>>>
>>>>> static PCollection<KV<String, Long>> getData(
>>>>> Pipeline pipeline, String filepattern, MatchConfiguration
>>>>> matchConfiguration) {
>>>>> PCollection<Metadata> matches =
>>>>> pipeline.apply(
>>>>>
>>>>> FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>>> return matches.apply(ParDo.of(new
>>>>> ReadFileFn())).apply(Log.ofElements("Get Data"));
>>>>> }
>>>>>
>>>>> /**
>>>>> * Processes matched files by outputting key-value pairs where key is
>>>>> equal to "my-key" and values
>>>>> * are Long values corresponding to the lines in the file. In the case
>>>>> file does not contain one
>>>>> * Long per line, IOException is thrown.
>>>>> */
>>>>> @DoFn.BoundedPerElement
>>>>> private static final class ReadFileFn extends DoFn<Metadata, KV<String,
>>>>> Long>> {
>>>>> private static final Logger logger =
>>>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>>>
>>>>> @ProcessElement
>>>>> public void processElement(
>>>>> ProcessContext c,
>>>>> RestrictionTracker<OffsetRange, Long> tracker,
>>>>> ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>>> throws IOException {
>>>>> Metadata metadata = c.element();
>>>>> logger.info(
>>>>> "reading {} with restriction {} @ {}",
>>>>> metadata,
>>>>> tracker.currentRestriction(),
>>>>> c.timestamp());
>>>>> String filename = metadata.resourceId().toString();
>>>>> Instant timestamp = getTimestamp(filename);
>>>>> try (BufferedReader br = new BufferedReader(new
>>>>> FileReader(filename))) {
>>>>> String line;
>>>>> for (long lineNumber = 0; (line = br.readLine()) != null;
>>>>> ++lineNumber) {
>>>>> if (lineNumber < tracker.currentRestriction().getFrom()
>>>>> || lineNumber >= tracker.currentRestriction().getTo()) {
>>>>> continue;
>>>>> }
>>>>> if (!tracker.tryClaim(lineNumber)) {
>>>>> logger.info("failed to claim {}", lineNumber);
>>>>> return;
>>>>> }
>>>>> c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)),
>>>>> timestamp);
>>>>> }
>>>>> }
>>>>> logger.info("setting watermark to {}", timestamp);
>>>>> watermarkEstimator.setWatermark(timestamp);
>>>>> logger.info("Finish processing {} in file {}",
>>>>> tracker.currentRestriction(), filename);
>>>>> }
>>>>>
>>>>> private Instant getTimestamp(String filepath) {
>>>>> // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
>>>>> String filename = Paths.get(filepath).getFileName().toString();
>>>>> int index = filename.lastIndexOf(".complete");
>>>>> if (index != -1) {
>>>>> // In the case it has a suffix, strip it.
>>>>> filename = filename.substring(0, index);
>>>>> }
>>>>> Instant timestamp =
>>>>> Instant.parse(new
>>>>> StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>>> if (index != -1) {
>>>>> // In the case it has a suffix i.e. it is complete, fast forward to
>>>>> the next day.
>>>>> return timestamp.plus(Duration.standardDays(1));
>>>>> }
>>>>> return timestamp;
>>>>> }
>>>>>
>>>>> @GetInitialRestriction
>>>>> public OffsetRange getInitialRestriction(@Element Metadata metadata)
>>>>> throws IOException {
>>>>> long lineCount;
>>>>> try (Stream<String> stream =
>>>>> Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>>> lineCount = stream.count();
>>>>> }
>>>>> return new OffsetRange(0L, lineCount);
>>>>> }
>>>>>
>>>>> @GetInitialWatermarkEstimatorState
>>>>> public Instant getInitialWatermarkEstimatorState(
>>>>> @Element Metadata metadata, @Restriction OffsetRange restriction) {
>>>>> String filename = metadata.resourceId().toString();
>>>>> logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>>> // Compute and return the initial watermark estimator state for each
>>>>> element and restriction.
>>>>> // All subsequent processing of an element and restriction will be
>>>>> restored from the existing
>>>>> // state.
>>>>> return getTimestamp(filename);
>>>>> }
>>>>>
>>>>> private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>>>>> if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>>> timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>>> } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>>> timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>>> }
>>>>> return timestamp;
>>>>> }
>>>>>
>>>>> @NewWatermarkEstimator
>>>>> public WatermarkEstimators.Manual newWatermarkEstimator(
>>>>> @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>>> logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>>> return new
>>>>> WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <[email protected]> wrote:
>>>>>
>>>>>> I'm working on a blog post[1] about splittable dofns that covers this
>>>>>> topic.
>>>>>>
>>>>>> The TLDR; is that FileIO.match() should allow users to control the
>>>>>> watermark estimator that is used and for your use case you should hold
>>>>>> the
>>>>>> watermark to some computable value (e.g. the files are generated every
>>>>>> hour
>>>>>> so once you know the last file has appeared for that hour you advance the
>>>>>> watermark to the current hour).
>>>>>>
>>>>>> 1:
>>>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>>>
>>>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am looking into:
>>>>>>> https://beam.apache.org/documentation/patterns/file-processing/
>>>>>>> since I would like to create a continuous pipeline that reads from files
>>>>>>> and assigns Event Times based on e.g. file metadata or actual data
>>>>>>> inside
>>>>>>> the file. For example:
>>>>>>>
>>>>>>> private static void run(String[] args) {
>>>>>>> PipelineOptions options =
>>>>>>> PipelineOptionsFactory.fromArgs(args).create();
>>>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>>>
>>>>>>> PCollection<Metadata> matches = pipeline
>>>>>>> .apply(FileIO.match()
>>>>>>> .filepattern("/tmp/input/*")
>>>>>>> .continuously(Duration.standardSeconds(15),
>>>>>>> Watch.Growth.never()));
>>>>>>> matches
>>>>>>> .apply(ParDo.of(new ReadFileFn()))
>>>>>>>
>>>>>>> pipeline.run();
>>>>>>> }
>>>>>>>
>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>>>>> private static final Logger logger =
>>>>>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>
>>>>>>> @ProcessElement
>>>>>>> public void processElement(ProcessContext c) throws IOException {
>>>>>>> Metadata metadata = c.element();
>>>>>>> // I believe c.timestamp() is based on processing time.
>>>>>>> logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>>> String filename = metadata.resourceId().toString();
>>>>>>> // Output timestamps must be no earlier than the timestamp of the
>>>>>>> // current input minus the allowed skew (0 milliseconds).
>>>>>>> Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>>> logger.info("lastModified @ {}", timestamp);
>>>>>>> try (BufferedReader br = new BufferedReader(new
>>>>>>> FileReader(filename))) {
>>>>>>> String line;
>>>>>>> while ((line = br.readLine()) != null) {
>>>>>>> c.outputWithTimestamp(line, c.timestamp());
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>>>>>>
>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>>>>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>>>>>>> the allowed skew (0 milliseconds). See the
>>>>>>> DoFn#getAllowedTimestampSkew()
>>>>>>> Javadoc for details on changing the allowed skew.
>>>>>>>
>>>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>>>> the event time for the PCollection<Metadata>. I can see that the
>>>>>>> call to continuously() makes the PCollection unbounded and assigns
>>>>>>> default Event Time. Without the call to continuously() I can assign the
>>>>>>> timestamps without problems either via c.outputWithTimestamp or
>>>>>>> WithTimestamp transform.
>>>>>>>
>>>>>>> I would like to know what is the way to fix the issue, and whether
>>>>>>> this use-case is currently supported in Beam.
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Piotr
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>
>> --
>> Best regards,
>> Piotr
>>
>
--
Best regards,
Piotr
FileProcessing.java
Description: Binary data
