Got it, thank you for the clarification.

I tried to run the pipeline locally, with the following main (see full
source code attached):

public static void main(String[] args) {
  PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
  Pipeline pipeline = Pipeline.create(options);
  logger.info("running");

  PCollection<KV<String, Long>> output =
      FileProcessing.getData(
          pipeline, "/tmp/input/*", Duration.standardSeconds(1),
Growth.never());

  output
      .apply("Window", Window.into(FixedWindows.of(Duration.standardDays(1))))
      .apply("LogWindowed", Log.ofElements("testGetData"))
      .apply(Sum.longsPerKey())
      .apply(
          "FormatResults",
          MapElements.into(TypeDescriptors.strings())
              .via((KV<String, Long> kv) -> String.format("{},{}",
kv.getKey(), kv.getValue())))
      .apply("LogResults", Log.ofElements("results"))
      .apply(
          TextIO.write()
              .to(Paths.get("/tmp/output/").resolve("Results").toString())
              .withWindowedWrites()
              .withNumShards(1));

  pipeline.run();
}


Then I am generating files using:

for i in {01..30}; do echo "handling $i"; echo "1\n2\n3\n4" >
/tmp/input/1985-10-$i; sleep 2; touch /tmp/input/1985-10-$i.complete; sleep
2; done

I do not see any outputs being generated though. Can you elaborate why that
might be? I would suspect that once the watermark is set to day+1, the
results of the previous day should be finalized and hence the result for a
given window should be outputted.

On Wed, Oct 14, 2020 at 1:41 PM Luke Cwik <[email protected]> wrote:

> I think you should be using the largest "complete" timestamp from the
> metadata results and not be setting the watermark if you don't have one.
>
> On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk <[email protected]>
> wrote:
>
>> Thank you so much for the input, that was extremely helpful!
>>
>> I changed the pipeline from using FileIO.match() into using a custom
>> matching (very similar to the FileIO.match()) that looks as follows:
>>
>> static PCollection<KV<String, Long>> getData(
>>     Pipeline pipeline,
>>     String filepattern,
>>     Duration pollInterval,
>>     TerminationCondition terminationCondition) {
>>   final Growth<String, MatchResult.Metadata, String> 
>> stringMetadataStringGrowth =
>>       Watch.growthOf(
>>               Contextful.of(new MatchPollFn(), Requirements.empty()), new 
>> ExtractFilenameFn())
>>           .withPollInterval(pollInterval)
>>           .withTerminationPerInput(terminationCondition);
>>   return pipeline
>>       .apply("Create filepattern", Create.<String>of(filepattern))
>>       .apply("Continuously match filepatterns", stringMetadataStringGrowth)
>>       .apply(Values.create())
>>       .apply(ParDo.of(new ReadFileFn()));
>> }
>>
>> private static class MatchPollFn extends PollFn<String, Metadata> {
>>   private static final Logger logger = 
>> LoggerFactory.getLogger(MatchPollFn.class);
>>
>>   @Override
>>   public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, 
>> Context c)
>>       throws Exception {
>>     // Here we only have the filepattern i.e. element, and hence we do not 
>> know what the timestamp
>>     // and/or watermark should be. As a result, we output EPOCH as both the 
>> timestamp and the
>>     // watermark.
>>     Instant instant = Instant.EPOCH;
>>     return Watch.Growth.PollResult.incomplete(
>>             instant, FileSystems.match(element, 
>> EmptyMatchTreatment.ALLOW).metadata())
>>         .withWatermark(instant);
>>   }
>> }
>>
>> private static class ExtractFilenameFn implements 
>> SerializableFunction<Metadata, String> {
>>   @Override
>>   public String apply(MatchResult.Metadata input) {
>>     return input.resourceId().toString();
>>   }
>> }
>>
>> The above together with fixing the bugs that Luke pointed out (Thank you
>> Luke!), makes the unit test pass.
>>
>> Thank you again!
>>
>> If you have any feedback for the current code, I would appreciate it. I
>> am especially interested whether setting event time and watermark in
>> *MatchPollFn* to *EPOCH* is a correct way to go.
>>
>>
>> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <[email protected]> wrote:
>>
>>> FYI this is a major limitation in FileIO.match's watermarking ability. I
>>> believe there is a JIRA issue about this, but nobody has ever worked on
>>> improving it.
>>>
>>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <[email protected]> wrote:
>>>
>>>> FileIO.match doesn't allow one to configure how the watermark advances
>>>> and it assumes that the watermark during polling is always the current
>>>> system time[1].
>>>>
>>>> Because of this the downstream watermark advancement is limited. When
>>>> an element and restriction starts processing, the maximum you can hold the
>>>> output watermark back by for this element and restriction pair is limited
>>>> to the current input watermark (a common value to use is the current
>>>> element's timestamp as the lower bound for all future output but if that
>>>> element is late the output you produce may or may not be late (depends on
>>>> downstream windowing strategy)). Holding this watermark back is important
>>>> since many of these elements and restrictions could be processed in
>>>> parallel at different rates.
>>>>
>>>> Based upon your implementation, you wouldn't need to control the
>>>> watermark from the file reading splittable DoFn if FileIO.match allowed you
>>>> to say what the watermark is after each polling round and allowed you to
>>>> set the timestamp for each match found. This initial setting of the
>>>> watermark during polling would be properly handled by the runner to block
>>>> watermark advancement for those elements.
>>>>
>>>> Minor comments not related to your issue but would improve your
>>>> implementation:
>>>> 1) Typically you set the watermark right before returning. You are
>>>> missing this from the failed tryClaim loop return.
>>>> 2) You should structure your loop not based upon the end of the current
>>>> restriction but continue processing till tryClaim fails. For example:
>>>>       @ProcessElement
>>>>       public void processElement(@Element String fileName,
>>>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>>>> outputReceiver) throws IOException {
>>>>         RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>>>         seekToNextRecordBoundaryInFile(file,
>>>> tracker.currentRestriction().getFrom());
>>>>         while (tracker.tryClaim(file.getFilePointer())) {
>>>>           outputReceiver.output(readNextRecord(file));
>>>>         }
>>>>       }
>>>> 3) ensureTimestampWithinBounds is dangerous as you're masking a
>>>> possible data issue since the code either parsed some filename incorrectly.
>>>> It is likely that you copied this from Beam code and it is used there
>>>> because user implementations of UnboundedSource were incorrectly setting
>>>> the watermark outside of the bounds and there is no way to fix them.
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>>>
>>>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <
>>>> [email protected]> wrote:
>>>>
>>>>> Thank you for a quick response. I tried to follow the doc attached and
>>>>> read existing Beam code that uses the Splittable DoFns, and I made some
>>>>> progress.
>>>>>
>>>>> I created a simple pipeline that matches given filepattern, and uses
>>>>> splittable dofn to control event times and watermarks. The pipeline 
>>>>> expects
>>>>> files with the following name patterns:
>>>>>
>>>>> *yyyy-MM-dd*
>>>>>
>>>>> *yyyy-MM-dd.complete*
>>>>>
>>>>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs
>>>>> lines of the file using *outputWithTimestamp(..., timestamp)*.
>>>>> Additionally, it calls *watermarkEstimator.setWatermark(timestamp)*.
>>>>> In both cases the timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>>>
>>>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty) it
>>>>> calls *watermarkEstimator.setWatermark(timestamp)*, where timestamp
>>>>> is *yyyy-MM-ddT00:00:00.000Z plus one day* - hence it advances to the
>>>>> next day.
>>>>>
>>>>> I am at the point when the following unit test fails the inWindow()
>>>>> assertions, the last assertion passes. It seems that even though I
>>>>> call watermarkEstimator.setWatermark() the window is not being closed.
>>>>>
>>>>> I would appreciate help/suggestions on what I am missing.
>>>>>
>>>>> Here is a unit test. The function being tested is getData() defined
>>>>> below.
>>>>>
>>>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>>>   final Duration duration = Duration.standardDays(1);
>>>>>
>>>>>   IntervalWindow firstWindow =
>>>>>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), 
>>>>> duration);
>>>>>   logger.info("first window {}", firstWindow);
>>>>>   IntervalWindow secondWindow =
>>>>>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), 
>>>>> duration);
>>>>>   logger.info("second window {}", secondWindow);
>>>>>
>>>>>   MatchConfiguration matchConfiguration =
>>>>>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>>>           .continuously(
>>>>>               Duration.millis(100),
>>>>>               
>>>>> Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>>>
>>>>>   PCollection<KV<String, Long>> output =
>>>>>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() + 
>>>>> "/*", matchConfiguration)
>>>>>           .apply("Window", Window.into(FixedWindows.of(duration)))
>>>>>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>>>
>>>>>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>>>
>>>>>   Thread writer =
>>>>>       new Thread(
>>>>>           () -> {
>>>>>             try {
>>>>>               Thread.sleep(1000);
>>>>>
>>>>>               Path firstPath = tmpFolder.newFile("2020-01-01").toPath();
>>>>>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>>>
>>>>>               Thread.sleep(1000);
>>>>>
>>>>>               Path firstPathComplete = 
>>>>> tmpFolder.newFile("2020-01-01.complete").toPath();
>>>>>               Files.write(firstPathComplete, Arrays.asList());
>>>>>
>>>>>               Thread.sleep(1000);
>>>>>
>>>>>               Path secondPath = tmpFolder.newFile("2020-01-02").toPath();
>>>>>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>>>
>>>>>               Thread.sleep(1000);
>>>>>
>>>>>               Path secondPathComplete = 
>>>>> tmpFolder.newFile("2020-01-02.complete").toPath();
>>>>>               Files.write(secondPathComplete, Arrays.asList());
>>>>>
>>>>>             } catch (IOException | InterruptedException e) {
>>>>>               throw new RuntimeException(e);
>>>>>             }
>>>>>           });
>>>>>   writer.start();
>>>>>
>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>   PAssert.that(output)
>>>>>       .inWindow(firstWindow)
>>>>>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), 
>>>>> KV.of("my-key", 3L));
>>>>>
>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>   PAssert.that(output)
>>>>>       .inWindow(secondWindow)
>>>>>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), 
>>>>> KV.of("my-key", 6L));
>>>>>
>>>>>   // THIS ASSERTION PASSES.
>>>>>   PAssert.that(output)
>>>>>       .containsInAnyOrder(
>>>>>           KV.of("my-key", 1L),
>>>>>           KV.of("my-key", 2L),
>>>>>           KV.of("my-key", 3L),
>>>>>           KV.of("my-key", 4L),
>>>>>           KV.of("my-key", 5L),
>>>>>           KV.of("my-key", 6L));
>>>>>
>>>>>   p.run();
>>>>>
>>>>>   writer.join();
>>>>> }
>>>>>
>>>>> Here is the code. Essentially, I am using *FileIO.match()* to match
>>>>> filepattern. Then the file *Metadata* is processed by my custom
>>>>> Splittable DoFn.
>>>>>
>>>>> static PCollection<KV<String, Long>> getData(
>>>>>     Pipeline pipeline, String filepattern, MatchConfiguration 
>>>>> matchConfiguration) {
>>>>>   PCollection<Metadata> matches =
>>>>>       pipeline.apply(
>>>>>           
>>>>> FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>>>   return matches.apply(ParDo.of(new 
>>>>> ReadFileFn())).apply(Log.ofElements("Get Data"));
>>>>> }
>>>>>
>>>>> /**
>>>>>  * Processes matched files by outputting key-value pairs where key is 
>>>>> equal to "my-key" and values
>>>>>  * are Long values corresponding to the lines in the file. In the case 
>>>>> file does not contain one
>>>>>  * Long per line, IOException is thrown.
>>>>>  */
>>>>> @DoFn.BoundedPerElement
>>>>> private static final class ReadFileFn extends DoFn<Metadata, KV<String, 
>>>>> Long>> {
>>>>>   private static final Logger logger = 
>>>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>>>
>>>>>   @ProcessElement
>>>>>   public void processElement(
>>>>>       ProcessContext c,
>>>>>       RestrictionTracker<OffsetRange, Long> tracker,
>>>>>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>>>       throws IOException {
>>>>>     Metadata metadata = c.element();
>>>>>     logger.info(
>>>>>         "reading {} with restriction {} @ {}",
>>>>>         metadata,
>>>>>         tracker.currentRestriction(),
>>>>>         c.timestamp());
>>>>>     String filename = metadata.resourceId().toString();
>>>>>     Instant timestamp = getTimestamp(filename);
>>>>>     try (BufferedReader br = new BufferedReader(new 
>>>>> FileReader(filename))) {
>>>>>       String line;
>>>>>       for (long lineNumber = 0; (line = br.readLine()) != null; 
>>>>> ++lineNumber) {
>>>>>         if (lineNumber < tracker.currentRestriction().getFrom()
>>>>>             || lineNumber >= tracker.currentRestriction().getTo()) {
>>>>>           continue;
>>>>>         }
>>>>>         if (!tracker.tryClaim(lineNumber)) {
>>>>>           logger.info("failed to claim {}", lineNumber);
>>>>>           return;
>>>>>         }
>>>>>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), 
>>>>> timestamp);
>>>>>       }
>>>>>     }
>>>>>     logger.info("setting watermark to {}", timestamp);
>>>>>     watermarkEstimator.setWatermark(timestamp);
>>>>>     logger.info("Finish processing {} in file {}", 
>>>>> tracker.currentRestriction(), filename);
>>>>>   }
>>>>>
>>>>>   private Instant getTimestamp(String filepath) {
>>>>>     // Filename is assumed to be either yyyy-MM-dd or yyyy-MM-dd.complete.
>>>>>     String filename = Paths.get(filepath).getFileName().toString();
>>>>>     int index = filename.lastIndexOf(".complete");
>>>>>     if (index != -1) {
>>>>>       // In the case it has a suffix, strip it.
>>>>>       filename = filename.substring(0, index);
>>>>>     }
>>>>>     Instant timestamp =
>>>>>         Instant.parse(new 
>>>>> StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>>>     if (index != -1) {
>>>>>       // In the case it has a suffix i.e. it is complete, fast forward to 
>>>>> the next day.
>>>>>       return timestamp.plus(Duration.standardDays(1));
>>>>>     }
>>>>>     return timestamp;
>>>>>   }
>>>>>
>>>>>   @GetInitialRestriction
>>>>>   public OffsetRange getInitialRestriction(@Element Metadata metadata) 
>>>>> throws IOException {
>>>>>     long lineCount;
>>>>>     try (Stream<String> stream = 
>>>>> Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>>>       lineCount = stream.count();
>>>>>     }
>>>>>     return new OffsetRange(0L, lineCount);
>>>>>   }
>>>>>
>>>>>   @GetInitialWatermarkEstimatorState
>>>>>   public Instant getInitialWatermarkEstimatorState(
>>>>>       @Element Metadata metadata, @Restriction OffsetRange restriction) {
>>>>>     String filename = metadata.resourceId().toString();
>>>>>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>>>     // Compute and return the initial watermark estimator state for each 
>>>>> element and restriction.
>>>>>     // All subsequent processing of an element and restriction will be 
>>>>> restored from the existing
>>>>>     // state.
>>>>>     return getTimestamp(filename);
>>>>>   }
>>>>>
>>>>>   private static Instant ensureTimestampWithinBounds(Instant timestamp) {
>>>>>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>>>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>>>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>>>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>>>     }
>>>>>     return timestamp;
>>>>>   }
>>>>>
>>>>>   @NewWatermarkEstimator
>>>>>   public WatermarkEstimators.Manual newWatermarkEstimator(
>>>>>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>>>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>>>     return new 
>>>>> WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <[email protected]> wrote:
>>>>>
>>>>>> I'm working on a blog post[1] about splittable dofns that covers this
>>>>>> topic.
>>>>>>
>>>>>> The TLDR; is that FileIO.match() should allow users to control the
>>>>>> watermark estimator that is used and for your use case you should hold 
>>>>>> the
>>>>>> watermark to some computable value (e.g. the files are generated every 
>>>>>> hour
>>>>>> so once you know the last file has appeared for that hour you advance the
>>>>>> watermark to the current hour).
>>>>>>
>>>>>> 1:
>>>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>>>
>>>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am looking into:
>>>>>>> https://beam.apache.org/documentation/patterns/file-processing/
>>>>>>> since I would like to create a continuous pipeline that reads from files
>>>>>>> and assigns Event Times based on e.g. file metadata or actual data 
>>>>>>> inside
>>>>>>> the file. For example:
>>>>>>>
>>>>>>> private static void run(String[] args) {
>>>>>>>   PipelineOptions options = 
>>>>>>> PipelineOptionsFactory.fromArgs(args).create();
>>>>>>>   Pipeline pipeline = Pipeline.create(options);
>>>>>>>
>>>>>>>   PCollection<Metadata> matches = pipeline
>>>>>>>       .apply(FileIO.match()
>>>>>>>           .filepattern("/tmp/input/*")
>>>>>>>           .continuously(Duration.standardSeconds(15), 
>>>>>>> Watch.Growth.never()));
>>>>>>>   matches
>>>>>>>       .apply(ParDo.of(new ReadFileFn()))
>>>>>>>
>>>>>>>   pipeline.run();
>>>>>>> }
>>>>>>>
>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, String> {
>>>>>>>   private static final Logger logger = 
>>>>>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>
>>>>>>>   @ProcessElement
>>>>>>>   public void processElement(ProcessContext c) throws IOException {
>>>>>>>     Metadata metadata = c.element();
>>>>>>>     // I believe c.timestamp() is based on processing time.
>>>>>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>     // Output timestamps must be no earlier than the timestamp of the
>>>>>>>     // current input minus the allowed skew (0 milliseconds).
>>>>>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>>>     logger.info("lastModified @ {}", timestamp);
>>>>>>>     try (BufferedReader br = new BufferedReader(new 
>>>>>>> FileReader(filename))) {
>>>>>>>       String line;
>>>>>>>       while ((line = br.readLine()) != null) {
>>>>>>>         c.outputWithTimestamp(line, c.timestamp());
>>>>>>>       }
>>>>>>>     }
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> The issue is that when calling c.outputWithTimestamp() I am getting:
>>>>>>>
>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no earlier
>>>>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) minus
>>>>>>> the allowed skew (0 milliseconds). See the 
>>>>>>> DoFn#getAllowedTimestampSkew()
>>>>>>> Javadoc for details on changing the allowed skew.
>>>>>>>
>>>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>>>> the event time for the PCollection<Metadata>. I can see that the
>>>>>>> call to continuously() makes the PCollection unbounded and assigns
>>>>>>> default Event Time. Without the call to continuously() I can assign the
>>>>>>> timestamps without problems either via c.outputWithTimestamp or
>>>>>>> WithTimestamp transform.
>>>>>>>
>>>>>>> I would like to know what is the way to fix the issue, and whether
>>>>>>> this use-case is currently supported in Beam.
>>>>>>>
>>>>>>> --
>>>>>>> Best regards,
>>>>>>> Piotr
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>
>> --
>> Best regards,
>> Piotr
>>
>

-- 
Best regards,
Piotr

Attachment: FileProcessing.java
Description: Binary data

Reply via email to