Made it work e2e. Thank you all for the help!
On Wed, Oct 14, 2020 at 3:48 PM Piotr Filipiuk <[email protected]>
wrote:
> Got it, thank you for the clarification.
>
> I tried to run the pipeline locally, with the following main (see full
> source code attached):
>
> public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
> Pipeline pipeline = Pipeline.create(options);
> logger.info("running");
>
> PCollection<KV<String, Long>> output =
> FileProcessing.getData(
> pipeline, "/tmp/input/*", Duration.standardSeconds(1),
> Growth.never());
>
> output
> .apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
> .apply("LogWindowed", Log.ofElements("testGetData"))
> .apply(Sum.longsPerKey())
> .apply(
> "FormatResults",
> MapElements.into(TypeDescriptors.strings())
> .via((KV<String, Long> kv) -> String.format("{},{}",
> kv.getKey(), kv.getValue())))
> .apply("LogResults", Log.ofElements("results"))
> .apply(
> TextIO.write()
> .to(Paths.get("/tmp/output/").resolve("Results").toString())
> .withWindowedWrites()
> .withNumShards(1));
>
> pipeline.run();
> }
>
>
> Then I am generating files using:
>
> for i in {01..30}; do echo "handling $i"; echo "1\n2\n3\n4" >
> /tmp/input/1985-10-$i; sleep 2; touch /tmp/input/1985-10-$i.complete; sleep
> 2; done
>
> I do not see any outputs being generated though. Can you elaborate why
> that might be? I would suspect that once the watermark is set to day+1, the
> results of the previous day should be finalized and hence the result for a
> given window should be outputted.
>
> On Wed, Oct 14, 2020 at 1:41 PM Luke Cwik <[email protected]> wrote:
>
>> I think you should be using the largest "complete" timestamp from the
>> metadata results and not be setting the watermark if you don't have one.
>>
>> On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk <[email protected]>
>> wrote:
>>
>>> Thank you so much for the input, that was extremely helpful!
>>>
>>> I changed the pipeline from using FileIO.match() into using a custom
>>> matching (very similar to the FileIO.match()) that looks as follows:
>>>
>>> static PCollection<KV<String, Long>> getData(
>>> Pipeline pipeline,
>>> String filepattern,
>>> Duration pollInterval,
>>> TerminationCondition terminationCondition) {
>>> final Growth<String, MatchResult.Metadata, String>
>>> stringMetadataStringGrowth =
>>> Watch.growthOf(
>>> Contextful.of(new MatchPollFn(), Requirements.empty()), new
>>> ExtractFilenameFn())
>>> .withPollInterval(pollInterval)
>>> .withTerminationPerInput(terminationCondition);
>>> return pipeline
>>> .apply("Create filepattern", Create.<String>of(filepattern))
>>> .apply("Continuously match filepatterns", stringMetadataStringGrowth)
>>> .apply(Values.create())
>>> .apply(ParDo.of(new ReadFileFn()));
>>> }
>>>
>>> private static class MatchPollFn extends PollFn<String, Metadata> {
>>> private static final Logger logger =
>>> LoggerFactory.getLogger(MatchPollFn.class);
>>>
>>> @Override
>>> public Watch.Growth.PollResult<MatchResult.Metadata> apply(String
>>> element, Context c)
>>> throws Exception {
>>> // Here we only have the filepattern i.e. element, and hence we do not
>>> know what the timestamp
>>> // and/or watermark should be. As a result, we output EPOCH as both the
>>> timestamp and the
>>> // watermark.
>>> Instant instant = Instant.EPOCH;
>>> return Watch.Growth.PollResult.incomplete(
>>> instant, FileSystems.match(element,
>>> EmptyMatchTreatment.ALLOW).metadata())
>>> .withWatermark(instant);
>>> }
>>> }
>>>
>>> private static class ExtractFilenameFn implements
>>> SerializableFunction<Metadata, String> {
>>> @Override
>>> public String apply(MatchResult.Metadata input) {
>>> return input.resourceId().toString();
>>> }
>>> }
>>>
>>> The above together with fixing the bugs that Luke pointed out (Thank you
>>> Luke!), makes the unit test pass.
>>>
>>> Thank you again!
>>>
>>> If you have any feedback for the current code, I would appreciate it. I
>>> am especially interested whether setting event time and watermark in
>>> *MatchPollFn* to *EPOCH* is a correct way to go.
>>>
>>>
>>> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <[email protected]> wrote:
>>>
>>>> FYI this is a major limitation in FileIO.match's watermarking ability.
>>>> I believe there is a JIRA issue about this, but nobody has ever worked on
>>>> improving it.
>>>>
>>>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> FileIO.match doesn't allow one to configure how the watermark advances
>>>>> and it assumes that the watermark during polling is always the current
>>>>> system time[1].
>>>>>
>>>>> Because of this the downstream watermark advancement is limited. When
>>>>> an element and restriction starts processing, the maximum you can hold the
>>>>> output watermark back by for this element and restriction pair is limited
>>>>> to the current input watermark (a common value to use is the current
>>>>> element's timestamp as the lower bound for all future output but if that
>>>>> element is late the output you produce may or may not be late (depends on
>>>>> downstream windowing strategy)). Holding this watermark back is important
>>>>> since many of these elements and restrictions could be processed in
>>>>> parallel at different rates.
>>>>>
>>>>> Based upon your implementation, you wouldn't need to control the
>>>>> watermark from the file reading splittable DoFn if FileIO.match allowed
>>>>> you
>>>>> to say what the watermark is after each polling round and allowed you to
>>>>> set the timestamp for each match found. This initial setting of the
>>>>> watermark during polling would be properly handled by the runner to block
>>>>> watermark advancement for those elements.
>>>>>
>>>>> Minor comments not related to your issue but would improve your
>>>>> implementation:
>>>>> 1) Typically you set the watermark right before returning. You are
>>>>> missing this from the failed tryClaim loop return.
>>>>> 2) You should structure your loop not based upon the end of the
>>>>> current restriction but continue processing till tryClaim fails. For
>>>>> example:
>>>>> @ProcessElement
>>>>> public void processElement(@Element String fileName,
>>>>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>>>>> outputReceiver) throws IOException {
>>>>> RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>>>> seekToNextRecordBoundaryInFile(file,
>>>>> tracker.currentRestriction().getFrom());
>>>>> while (tracker.tryClaim(file.getFilePointer())) {
>>>>> outputReceiver.output(readNextRecord(file));
>>>>> }
>>>>> }
>>>>> 3) ensureTimestampWithinBounds is dangerous as you're masking a
>>>>> possible data issue since the code either parsed some filename
>>>>> incorrectly.
>>>>> It is likely that you copied this from Beam code and it is used there
>>>>> because user implementations of UnboundedSource were incorrectly setting
>>>>> the watermark outside of the bounds and there is no way to fix them.
>>>>>
>>>>> 1:
>>>>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>>>>
>>>>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Thank you for a quick response. I tried to follow the doc attached
>>>>>> and read existing Beam code that uses the Splittable DoFns, and I made
>>>>>> some
>>>>>> progress.
>>>>>>
>>>>>> I created a simple pipeline that matches given filepattern, and uses
>>>>>> splittable dofn to control event times and watermarks. The pipeline
>>>>>> expects
>>>>>> files with the following name patterns:
>>>>>>
>>>>>> *yyyy-MM-dd*
>>>>>>
>>>>>> *yyyy-MM-dd.complete*
>>>>>>
>>>>>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs
>>>>>> lines of the file using *outputWithTimestamp(..., timestamp)*.
>>>>>> Additionally, it calls *watermarkEstimator.setWatermark(timestamp)*.
>>>>>> In both cases the timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>>>>
>>>>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it
>>>>>> calls *watermarkEstimator.setWatermark(timestamp)*, where timestamp
>>>>>> is *yyyy-MM-ddT00:00:00.000Z plus one day* - hence it advances to
>>>>>> the next day.
>>>>>>
>>>>>> I am at the point when the following unit test fails the inWindow()
>>>>>> assertions, the last assertion passes. It seems that even though I
>>>>>> call watermarkEstimator.setWatermark() the window is not being closed.
>>>>>>
>>>>>> I would appreciate help/suggestions on what I am missing.
>>>>>>
>>>>>> Here is a unit test. The function being tested is getData() defined
>>>>>> below.
>>>>>>
>>>>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>>>> final Duration duration = Duration.standardDays(1);
>>>>>>
>>>>>> IntervalWindow firstWindow =
>>>>>> new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"),
>>>>>> duration);
>>>>>> logger.info("first window {}", firstWindow);
>>>>>> IntervalWindow secondWindow =
>>>>>> new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"),
>>>>>> duration);
>>>>>> logger.info("second window {}", secondWindow);
>>>>>>
>>>>>> MatchConfiguration matchConfiguration =
>>>>>> MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>>>> .continuously(
>>>>>> Duration.millis(100),
>>>>>>
>>>>>> Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>>>>
>>>>>> PCollection<KV<String, Long>> output =
>>>>>> FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() +
>>>>>> "/*", matchConfiguration)
>>>>>> .apply("Window", Window.into(FixedWindows.of(duration)))
>>>>>> .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>>>>
>>>>>> assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>>>>
>>>>>> Thread writer =
>>>>>> new Thread(
>>>>>> () -> {
>>>>>> try {
>>>>>> Thread.sleep(1000);
>>>>>>
>>>>>> Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>>>>>> Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>>>>
>>>>>> Thread.sleep(1000);
>>>>>>
>>>>>> Path firstPathComplete =
>>>>>> tmpFolder.newFile("2020-01-01.complete").toPath();
>>>>>> Files.write(firstPathComplete, Arrays.asList());
>>>>>>
>>>>>> Thread.sleep(1000);
>>>>>>
>>>>>> Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>>>>>> Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>>>>
>>>>>> Thread.sleep(1000);
>>>>>>
>>>>>> Path secondPathComplete =
>>>>>> tmpFolder.newFile("2020-01-02.complete").toPath();
>>>>>> Files.write(secondPathComplete, Arrays.asList());
>>>>>>
>>>>>> } catch (IOException | InterruptedException e) {
>>>>>> throw new RuntimeException(e);
>>>>>> }
>>>>>> });
>>>>>> writer.start();
>>>>>>
>>>>>> // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>> PAssert.that(output)
>>>>>> .inWindow(firstWindow)
>>>>>> .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L),
>>>>>> KV.of("my-key", 3L));
>>>>>>
>>>>>> // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>> PAssert.that(output)
>>>>>> .inWindow(secondWindow)
>>>>>> .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L),
>>>>>> KV.of("my-key", 6L));
>>>>>>
>>>>>> // THIS ASSERTION PASSES.
>>>>>> PAssert.that(output)
>>>>>> .containsInAnyOrder(
>>>>>> KV.of("my-key", 1L),
>>>>>> KV.of("my-key", 2L),
>>>>>> KV.of("my-key", 3L),
>>>>>> KV.of("my-key", 4L),
>>>>>> KV.of("my-key", 5L),
>>>>>> KV.of("my-key", 6L));
>>>>>>
>>>>>> p.run();
>>>>>>
>>>>>> writer.join();
>>>>>> }
>>>>>>
>>>>>> Here is the code. Essentially, I am using *FileIO.match()* to match
>>>>>> filepattern. Then the file *Metadata* is processed by my custom
>>>>>> Splittable DoFn.
>>>>>>
>>>>>> static PCollection<KV<String, Long>> getData(
>>>>>> Pipeline pipeline, String filepattern, MatchConfiguration
>>>>>> matchConfiguration) {
>>>>>> PCollection<Metadata> matches =
>>>>>> pipeline.apply(
>>>>>>
>>>>>> FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>>>> return matches.apply(ParDo.of(new
>>>>>> ReadFileFn())).apply(Log.ofElements("Get Data"));
>>>>>> }
>>>>>>
>>>>>> /**
>>>>>> * Processes matched files by outputting key-value pairs where key is
>>>>>> equal to "my-key" and values
>>>>>> * are Long values corresponding to the lines in the file. In the case
>>>>>> file does not contain one
>>>>>> * Long per line, IOException is thrown.
>>>>>> */
>>>>>> @DoFn.BoundedPerElement
>>>>>> private static final class ReadFileFn extends DoFn<Metadata, KV<String,
>>>>>> Long>> {
>>>>>> private static final Logger logger =
>>>>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>
>>>>>> @ProcessElement
>>>>>> public void processElement(
>>>>>> ProcessContext c,
>>>>>> RestrictionTracker<OffsetRange, Long> tracker,
>>>>>> ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>>>> throws IOException {
>>>>>> Metadata metadata = c.element();
>>>>>> logger.info(
>>>>>> "reading {} with restriction {} @ {}",
>>>>>> metadata,
>>>>>> tracker.currentRestriction(),
>>>>>> c.timestamp());
>>>>>> String filename = metadata.resourceId().toString();
>>>>>> Instant timestamp = getTimestamp(filename);
>>>>>> try (BufferedReader br = new BufferedReader(new
>>>>>> FileReader(filename))) {
>>>>>> String line;
>>>>>> for (long lineNumber = 0; (line = br.readLine()) != null;
>>>>>> ++lineNumber) {
>>>>>> if (lineNumber < tracker.currentRestriction().getFrom()
>>>>>> || lineNumber >= tracker.currentRestriction().getTo()) {
>>>>>> continue;
>>>>>> }
>>>>>> if (!tracker.tryClaim(lineNumber)) {
>>>>>> logger.info("failed to claim {}", lineNumber);
>>>>>> return;
>>>>>> }
>>>>>> c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)),
>>>>>> timestamp);
>>>>>> }
>>>>>> }
>>>>>> logger.info("setting watermark to {}", timestamp);
>>>>>> watermarkEstimator.setWatermark(timestamp);
>>>>>> logger.info("Finish processing {} in file {}",
>>>>>> tracker.currentRestriction(), filename);
>>>>>> }
>>>>>>
>>>>>> private Instant getTimestamp(String filepath) {
>>>>>> // Filename is assumed to be either yyyy-MM-dd or
>>>>>> yyyy-MM-dd.complete.
>>>>>> String filename = Paths.get(filepath).getFileName().toString();
>>>>>> int index = filename.lastIndexOf(".complete");
>>>>>> if (index != -1) {
>>>>>> // In the case it has a suffix, strip it.
>>>>>> filename = filename.substring(0, index);
>>>>>> }
>>>>>> Instant timestamp =
>>>>>> Instant.parse(new
>>>>>> StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>>>> if (index != -1) {
>>>>>> // In the case it has a suffix i.e. it is complete, fast forward
>>>>>> to the next day.
>>>>>> return timestamp.plus(Duration.standardDays(1));
>>>>>> }
>>>>>> return timestamp;
>>>>>> }
>>>>>>
>>>>>> @GetInitialRestriction
>>>>>> public OffsetRange getInitialRestriction(@Element Metadata metadata)
>>>>>> throws IOException {
>>>>>> long lineCount;
>>>>>> try (Stream<String> stream =
>>>>>> Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>>>> lineCount = stream.count();
>>>>>> }
>>>>>> return new OffsetRange(0L, lineCount);
>>>>>> }
>>>>>>
>>>>>> @GetInitialWatermarkEstimatorState
>>>>>> public Instant getInitialWatermarkEstimatorState(
>>>>>> @Element Metadata metadata, @Restriction OffsetRange restriction) {
>>>>>> String filename = metadata.resourceId().toString();
>>>>>> logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>>>> // Compute and return the initial watermark estimator state for each
>>>>>> element and restriction.
>>>>>> // All subsequent processing of an element and restriction will be
>>>>>> restored from the existing
>>>>>> // state.
>>>>>> return getTimestamp(filename);
>>>>>> }
>>>>>>
>>>>>> private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>>>>>> if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>>>> timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>>>> } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>>>> timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>>>> }
>>>>>> return timestamp;
>>>>>> }
>>>>>>
>>>>>> @NewWatermarkEstimator
>>>>>> public WatermarkEstimators.Manual newWatermarkEstimator(
>>>>>> @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>>>> logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>>>> return new
>>>>>> WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> I'm working on a blog post[1] about splittable dofns that covers
>>>>>>> this topic.
>>>>>>>
>>>>>>> The TLDR; is that FileIO.match() should allow users to control the
>>>>>>> watermark estimator that is used and for your use case you should hold
>>>>>>> the
>>>>>>> watermark to some computable value (e.g. the files are generated every
>>>>>>> hour
>>>>>>> so once you know the last file has appeared for that hour you advance
>>>>>>> the
>>>>>>> watermark to the current hour).
>>>>>>>
>>>>>>> 1:
>>>>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>>>>
>>>>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am looking into:
>>>>>>>> https://beam.apache.org/documentation/patterns/file-processing/
>>>>>>>> since I would like to create a continuous pipeline that reads from
>>>>>>>> files
>>>>>>>> and assigns Event Times based on e.g. file metadata or actual data
>>>>>>>> inside
>>>>>>>> the file. For example:
>>>>>>>>
>>>>>>>> private static void run(String[] args) {
>>>>>>>> PipelineOptions options =
>>>>>>>> PipelineOptionsFactory.fromArgs(args).create();
>>>>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>>>>
>>>>>>>> PCollection<Metadata> matches = pipeline
>>>>>>>> .apply(FileIO.match()
>>>>>>>> .filepattern("/tmp/input/*")
>>>>>>>> .continuously(Duration.standardSeconds(15),
>>>>>>>> Watch.Growth.never()));
>>>>>>>> matches
>>>>>>>> .apply(ParDo.of(new ReadFileFn()))
>>>>>>>>
>>>>>>>> pipeline.run();
>>>>>>>> }
>>>>>>>>
>>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>>>>>> private static final Logger logger =
>>>>>>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>>
>>>>>>>> @ProcessElement
>>>>>>>> public void processElement(ProcessContext c) throws IOException {
>>>>>>>> Metadata metadata = c.element();
>>>>>>>> // I believe c.timestamp() is based on processing time.
>>>>>>>> logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>>>> String filename = metadata.resourceId().toString();
>>>>>>>> // Output timestamps must be no earlier than the timestamp of the
>>>>>>>> // current input minus the allowed skew (0 milliseconds).
>>>>>>>> Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>>>> logger.info("lastModified @ {}", timestamp);
>>>>>>>> try (BufferedReader br = new BufferedReader(new
>>>>>>>> FileReader(filename))) {
>>>>>>>> String line;
>>>>>>>> while ((line = br.readLine()) != null) {
>>>>>>>> c.outputWithTimestamp(line, c.timestamp());
>>>>>>>> }
>>>>>>>> }
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>>>>>>>
>>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>>>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no
>>>>>>>> earlier
>>>>>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z)
>>>>>>>> minus
>>>>>>>> the allowed skew (0 milliseconds). See the
>>>>>>>> DoFn#getAllowedTimestampSkew()
>>>>>>>> Javadoc for details on changing the allowed skew.
>>>>>>>>
>>>>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>>>>> the event time for the PCollection<Metadata>. I can see that the
>>>>>>>> call to continuously() makes the PCollection unbounded and assigns
>>>>>>>> default Event Time. Without the call to continuously() I can assign the
>>>>>>>> timestamps without problems either via c.outputWithTimestamp or
>>>>>>>> WithTimestamp transform.
>>>>>>>>
>>>>>>>> I would like to know what is the way to fix the issue, and whether
>>>>>>>> this use-case is currently supported in Beam.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards,
>>>>>>>> Piotr
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards,
>>>>>> Piotr
>>>>>>
>>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>
> --
> Best regards,
> Piotr
>
--
Best regards,
Piotr