Here is the simplified code that works e2e:

static class GetData extends PTransform<PBegin, PCollection<KV<String, Long>>> {

  private final String filepattern;
  private final Duration pollInterval;
  private final TerminationCondition terminationCondition;

  GetData(String filepattern, Duration pollInterval,
TerminationCondition terminationCondition) {
    this.filepattern = filepattern;
    this.pollInterval = pollInterval;
    this.terminationCondition = terminationCondition;
  }

  @Override
  public PCollection<KV<String, Long>> expand(PBegin input) {
    final Growth<String, MatchResult.Metadata, MatchResult.Metadata>
stringMetadataStringGrowth =
        Watch.growthOf(new MatchPollFn())
            .withPollInterval(pollInterval)
            .withTerminationPerInput(terminationCondition);
    return input
        .apply("Create filepattern", Create.<String>of(filepattern))
        .apply("Continuously match filepatterns", stringMetadataStringGrowth)
        .apply(Values.create())
        .apply(ParDo.of(new ReadFileFn()));
  }
}

/**
 * Outputs hardcoded values or nothing if file name ends with
".completed". The event times and
 * watermarks are unchanged
 */
private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    Metadata metadata = c.element();
    String filename = metadata.resourceId().toString();
    if (filename.endsWith(".complete")) {
      return;
    }
    c.output(KV.of("my-key", 1L));
    c.output(KV.of("my-key", 2L));
  }
}

/**
 * Matches input filepattern and outputs matched file {@code
Metadata}. The timestamps of the
 * values outputted are based on the filenames, which are assumed to
be either yyyy-MM-dd or
 * yyyy-MM-dd.complete. The watermark is set to the maximum of all the
outputted event timestamps.
 */
private static class MatchPollFn extends PollFn<String, Metadata> {
  private static final Logger logger =
LoggerFactory.getLogger(MatchPollFn.class);

  @Override
  public Watch.Growth.PollResult<MatchResult.Metadata> apply(String
element, Context c)
      throws Exception {
    final List<Metadata> metadataList =
        FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata();
    List<TimestampedValue<Metadata>> outputs = new LinkedList<>();
    Instant watermark = Instant.EPOCH;
    for (Metadata metadata : metadataList) {
      String filename = metadata.resourceId().toString();
      final Instant timestamp = getTimestamp(filename);
      outputs.add(TimestampedValue.of(metadata, timestamp));
      if (timestamp.isAfter(watermark)) {
        watermark = timestamp;
      }
    }
    logger.info("outputting watermark {}", watermark);
    return Watch.Growth.PollResult.incomplete(outputs).withWatermark(watermark);
  }
}

/**
 * Returns {@link Instant} based on the {@code filepath}. Filename is
assumed to be either
 * yyyy-MM-dd or yyyy-MM-dd.complete. The returned value is
"yyyy-MM-ddT00:00:00.000Z" for
 * yyyy-MM-dd and "yyyy-MM-ddT00:00:00.000Z"+24h for yyyy-MM-dd.complete.
 */
public static Instant getTimestamp(String filepath) {
  String filename = Paths.get(filepath).getFileName().toString();
  int index = filename.lastIndexOf(".complete");
  if (index != -1) {
    // In the case it has a suffix, strip it.
    filename = filename.substring(0, index);
  }
  Instant timestamp = Instant.parse(filename, dateTimeFormatter);
  if (index != -1) {
    // In the case it has a suffix i.e. it is complete, fast forward
to the next day.
    return timestamp.plus(Duration.standardDays(1));
  }
  return timestamp;
}




On Mon, Oct 19, 2020 at 9:56 AM Luke Cwik <[email protected]> wrote:

> For future reference, what did you have to change to get it to work?
>
> On Thu, Oct 15, 2020 at 2:40 PM Piotr Filipiuk <[email protected]>
> wrote:
>
>> Made it work e2e. Thank you all for the help!
>>
>> On Wed, Oct 14, 2020 at 3:48 PM Piotr Filipiuk <[email protected]>
>> wrote:
>>
>>> Got it, thank you for the clarification.
>>>
>>> I tried to run the pipeline locally, with the following main (see full
>>> source code attached):
>>>
>>> public static void main(String[] args) {
>>>   PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>>   Pipeline pipeline = Pipeline.create(options);
>>>   logger.info("running");
>>>
>>>   PCollection<KV<String, Long>> output =
>>>       FileProcessing.getData(
>>>           pipeline, "/tmp/input/*", Duration.standardSeconds(1), 
>>> Growth.never());
>>>
>>>   output
>>>       .apply("Window", 
>>> Window.into(FixedWindows.of(Duration.standardDays(1))))
>>>       .apply("LogWindowed", Log.ofElements("testGetData"))
>>>       .apply(Sum.longsPerKey())
>>>       .apply(
>>>           "FormatResults",
>>>           MapElements.into(TypeDescriptors.strings())
>>>               .via((KV<String, Long> kv) -> String.format("{},{}", 
>>> kv.getKey(), kv.getValue())))
>>>       .apply("LogResults", Log.ofElements("results"))
>>>       .apply(
>>>           TextIO.write()
>>>               .to(Paths.get("/tmp/output/").resolve("Results").toString())
>>>               .withWindowedWrites()
>>>               .withNumShards(1));
>>>
>>>   pipeline.run();
>>> }
>>>
>>>
>>> Then I am generating files using:
>>>
>>> for i in {01..30}; do echo "handling $i"; echo "1\n2\n3\n4" >
>>> /tmp/input/1985-10-$i; sleep 2; touch /tmp/input/1985-10-$i.complete; sleep
>>> 2; done
>>>
>>> I do not see any outputs being generated though. Can you elaborate why
>>> that might be? I would suspect that once the watermark is set to day+1, the
>>> results of the previous day should be finalized and hence the result for a
>>> given window should be outputted.
>>>
>>> On Wed, Oct 14, 2020 at 1:41 PM Luke Cwik <[email protected]> wrote:
>>>
>>>> I think you should be using the largest "complete" timestamp from the
>>>> metadata results and not be setting the watermark if you don't have one.
>>>>
>>>> On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk <
>>>> [email protected]> wrote:
>>>>
>>>>> Thank you so much for the input, that was extremely helpful!
>>>>>
>>>>> I changed the pipeline from using FileIO.match() into using a custom
>>>>> matching (very similar to the FileIO.match()) that looks as follows:
>>>>>
>>>>> static PCollection<KV<String, Long>> getData(
>>>>>     Pipeline pipeline,
>>>>>     String filepattern,
>>>>>     Duration pollInterval,
>>>>>     TerminationCondition terminationCondition) {
>>>>>   final Growth<String, MatchResult.Metadata, String> 
>>>>> stringMetadataStringGrowth =
>>>>>       Watch.growthOf(
>>>>>               Contextful.of(new MatchPollFn(), Requirements.empty()), new 
>>>>> ExtractFilenameFn())
>>>>>           .withPollInterval(pollInterval)
>>>>>           .withTerminationPerInput(terminationCondition);
>>>>>   return pipeline
>>>>>       .apply("Create filepattern", Create.<String>of(filepattern))
>>>>>       .apply("Continuously match filepatterns", 
>>>>> stringMetadataStringGrowth)
>>>>>       .apply(Values.create())
>>>>>       .apply(ParDo.of(new ReadFileFn()));
>>>>> }
>>>>>
>>>>> private static class MatchPollFn extends PollFn<String, Metadata> {
>>>>>   private static final Logger logger = 
>>>>> LoggerFactory.getLogger(MatchPollFn.class);
>>>>>
>>>>>   @Override
>>>>>   public Watch.Growth.PollResult<MatchResult.Metadata> apply(String 
>>>>> element, Context c)
>>>>>       throws Exception {
>>>>>     // Here we only have the filepattern i.e. element, and hence we do 
>>>>> not know what the timestamp
>>>>>     // and/or watermark should be. As a result, we output EPOCH as both 
>>>>> the timestamp and the
>>>>>     // watermark.
>>>>>     Instant instant = Instant.EPOCH;
>>>>>     return Watch.Growth.PollResult.incomplete(
>>>>>             instant, FileSystems.match(element, 
>>>>> EmptyMatchTreatment.ALLOW).metadata())
>>>>>         .withWatermark(instant);
>>>>>   }
>>>>> }
>>>>>
>>>>> private static class ExtractFilenameFn implements 
>>>>> SerializableFunction<Metadata, String> {
>>>>>   @Override
>>>>>   public String apply(MatchResult.Metadata input) {
>>>>>     return input.resourceId().toString();
>>>>>   }
>>>>> }
>>>>>
>>>>> The above together with fixing the bugs that Luke pointed out (Thank
>>>>> you Luke!), makes the unit test pass.
>>>>>
>>>>> Thank you again!
>>>>>
>>>>> If you have any feedback for the current code, I would appreciate it.
>>>>> I am especially interested whether setting event time and watermark in
>>>>> *MatchPollFn* to *EPOCH* is a correct way to go.
>>>>>
>>>>>
>>>>> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> FYI this is a major limitation in FileIO.match's watermarking
>>>>>> ability. I believe there is a JIRA issue about this, but nobody has ever
>>>>>> worked on improving it.
>>>>>>
>>>>>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> FileIO.match doesn't allow one to configure how the watermark
>>>>>>> advances and it assumes that the watermark during polling is always the
>>>>>>> current system time[1].
>>>>>>>
>>>>>>> Because of this the downstream watermark advancement is limited.
>>>>>>> When an element and restriction starts processing, the maximum you can 
>>>>>>> hold
>>>>>>> the output watermark back by for this element and restriction pair is
>>>>>>> limited to the current input watermark (a common value to use is the
>>>>>>> current element's timestamp as the lower bound for all future output 
>>>>>>> but if
>>>>>>> that element is late the output you produce may or may not be late 
>>>>>>> (depends
>>>>>>> on downstream windowing strategy)). Holding this watermark back is
>>>>>>> important since many of these elements and restrictions could be 
>>>>>>> processed
>>>>>>> in parallel at different rates.
>>>>>>>
>>>>>>> Based upon your implementation, you wouldn't need to control the
>>>>>>> watermark from the file reading splittable DoFn if FileIO.match allowed 
>>>>>>> you
>>>>>>> to say what the watermark is after each polling round and allowed you to
>>>>>>> set the timestamp for each match found. This initial setting of the
>>>>>>> watermark during polling would be properly handled by the runner to 
>>>>>>> block
>>>>>>> watermark advancement for those elements.
>>>>>>>
>>>>>>> Minor comments not related to your issue but would improve your
>>>>>>> implementation:
>>>>>>> 1) Typically you set the watermark right before returning. You are
>>>>>>> missing this from the failed tryClaim loop return.
>>>>>>> 2) You should structure your loop not based upon the end of the
>>>>>>> current restriction but continue processing till tryClaim fails. For
>>>>>>> example:
>>>>>>>       @ProcessElement
>>>>>>>       public void processElement(@Element String fileName,
>>>>>>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>>>>>>> outputReceiver) throws IOException {
>>>>>>>         RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>>>>>>         seekToNextRecordBoundaryInFile(file,
>>>>>>> tracker.currentRestriction().getFrom());
>>>>>>>         while (tracker.tryClaim(file.getFilePointer())) {
>>>>>>>           outputReceiver.output(readNextRecord(file));
>>>>>>>         }
>>>>>>>       }
>>>>>>> 3) ensureTimestampWithinBounds is dangerous as you're masking a
>>>>>>> possible data issue since the code either parsed some filename 
>>>>>>> incorrectly.
>>>>>>> It is likely that you copied this from Beam code and it is used there
>>>>>>> because user implementations of UnboundedSource were incorrectly setting
>>>>>>> the watermark outside of the bounds and there is no way to fix them.
>>>>>>>
>>>>>>> 1:
>>>>>>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>>>>>>
>>>>>>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Thank you for a quick response. I tried to follow the doc attached
>>>>>>>> and read existing Beam code that uses the Splittable DoFns, and I made 
>>>>>>>> some
>>>>>>>> progress.
>>>>>>>>
>>>>>>>> I created a simple pipeline that matches given filepattern, and
>>>>>>>> uses splittable dofn to control event times and watermarks. The 
>>>>>>>> pipeline
>>>>>>>> expects files with the following name patterns:
>>>>>>>>
>>>>>>>> *yyyy-MM-dd*
>>>>>>>>
>>>>>>>> *yyyy-MM-dd.complete*
>>>>>>>>
>>>>>>>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs
>>>>>>>> lines of the file using *outputWithTimestamp(..., timestamp)*.
>>>>>>>> Additionally, it calls *watermarkEstimator.setWatermark(timestamp)*.
>>>>>>>> In both cases the timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>>>>>>
>>>>>>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty)
>>>>>>>> it calls *watermarkEstimator.setWatermark(timestamp)*, where
>>>>>>>> timestamp is *yyyy-MM-ddT00:00:00.000Z plus one day* - hence it
>>>>>>>> advances to the next day.
>>>>>>>>
>>>>>>>> I am at the point when the following unit test fails the inWindow()
>>>>>>>> assertions, the last assertion passes. It seems that even though I
>>>>>>>> call watermarkEstimator.setWatermark() the window is not being closed.
>>>>>>>>
>>>>>>>> I would appreciate help/suggestions on what I am missing.
>>>>>>>>
>>>>>>>> Here is a unit test. The function being tested is getData() defined
>>>>>>>> below.
>>>>>>>>
>>>>>>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>>>>>>   final Duration duration = Duration.standardDays(1);
>>>>>>>>
>>>>>>>>   IntervalWindow firstWindow =
>>>>>>>>       new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"), 
>>>>>>>> duration);
>>>>>>>>   logger.info("first window {}", firstWindow);
>>>>>>>>   IntervalWindow secondWindow =
>>>>>>>>       new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"), 
>>>>>>>> duration);
>>>>>>>>   logger.info("second window {}", secondWindow);
>>>>>>>>
>>>>>>>>   MatchConfiguration matchConfiguration =
>>>>>>>>       MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>>>>>>           .continuously(
>>>>>>>>               Duration.millis(100),
>>>>>>>>               
>>>>>>>> Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>>>>>>
>>>>>>>>   PCollection<KV<String, Long>> output =
>>>>>>>>       FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath() 
>>>>>>>> + "/*", matchConfiguration)
>>>>>>>>           .apply("Window", Window.into(FixedWindows.of(duration)))
>>>>>>>>           .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>>>>>>
>>>>>>>>   assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>>>>>>
>>>>>>>>   Thread writer =
>>>>>>>>       new Thread(
>>>>>>>>           () -> {
>>>>>>>>             try {
>>>>>>>>               Thread.sleep(1000);
>>>>>>>>
>>>>>>>>               Path firstPath = 
>>>>>>>> tmpFolder.newFile("2020-01-01").toPath();
>>>>>>>>               Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>>>>>>
>>>>>>>>               Thread.sleep(1000);
>>>>>>>>
>>>>>>>>               Path firstPathComplete = 
>>>>>>>> tmpFolder.newFile("2020-01-01.complete").toPath();
>>>>>>>>               Files.write(firstPathComplete, Arrays.asList());
>>>>>>>>
>>>>>>>>               Thread.sleep(1000);
>>>>>>>>
>>>>>>>>               Path secondPath = 
>>>>>>>> tmpFolder.newFile("2020-01-02").toPath();
>>>>>>>>               Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>>>>>>
>>>>>>>>               Thread.sleep(1000);
>>>>>>>>
>>>>>>>>               Path secondPathComplete = 
>>>>>>>> tmpFolder.newFile("2020-01-02.complete").toPath();
>>>>>>>>               Files.write(secondPathComplete, Arrays.asList());
>>>>>>>>
>>>>>>>>             } catch (IOException | InterruptedException e) {
>>>>>>>>               throw new RuntimeException(e);
>>>>>>>>             }
>>>>>>>>           });
>>>>>>>>   writer.start();
>>>>>>>>
>>>>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>>>   PAssert.that(output)
>>>>>>>>       .inWindow(firstWindow)
>>>>>>>>       .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L), 
>>>>>>>> KV.of("my-key", 3L));
>>>>>>>>
>>>>>>>>   // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>>>   PAssert.that(output)
>>>>>>>>       .inWindow(secondWindow)
>>>>>>>>       .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L), 
>>>>>>>> KV.of("my-key", 6L));
>>>>>>>>
>>>>>>>>   // THIS ASSERTION PASSES.
>>>>>>>>   PAssert.that(output)
>>>>>>>>       .containsInAnyOrder(
>>>>>>>>           KV.of("my-key", 1L),
>>>>>>>>           KV.of("my-key", 2L),
>>>>>>>>           KV.of("my-key", 3L),
>>>>>>>>           KV.of("my-key", 4L),
>>>>>>>>           KV.of("my-key", 5L),
>>>>>>>>           KV.of("my-key", 6L));
>>>>>>>>
>>>>>>>>   p.run();
>>>>>>>>
>>>>>>>>   writer.join();
>>>>>>>> }
>>>>>>>>
>>>>>>>> Here is the code. Essentially, I am using *FileIO.match()* to
>>>>>>>> match filepattern. Then the file *Metadata* is processed by my
>>>>>>>> custom Splittable DoFn.
>>>>>>>>
>>>>>>>> static PCollection<KV<String, Long>> getData(
>>>>>>>>     Pipeline pipeline, String filepattern, MatchConfiguration 
>>>>>>>> matchConfiguration) {
>>>>>>>>   PCollection<Metadata> matches =
>>>>>>>>       pipeline.apply(
>>>>>>>>           
>>>>>>>> FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>>>>>>   return matches.apply(ParDo.of(new 
>>>>>>>> ReadFileFn())).apply(Log.ofElements("Get Data"));
>>>>>>>> }
>>>>>>>>
>>>>>>>> /**
>>>>>>>>  * Processes matched files by outputting key-value pairs where key is 
>>>>>>>> equal to "my-key" and values
>>>>>>>>  * are Long values corresponding to the lines in the file. In the case 
>>>>>>>> file does not contain one
>>>>>>>>  * Long per line, IOException is thrown.
>>>>>>>>  */
>>>>>>>> @DoFn.BoundedPerElement
>>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, 
>>>>>>>> KV<String, Long>> {
>>>>>>>>   private static final Logger logger = 
>>>>>>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>>
>>>>>>>>   @ProcessElement
>>>>>>>>   public void processElement(
>>>>>>>>       ProcessContext c,
>>>>>>>>       RestrictionTracker<OffsetRange, Long> tracker,
>>>>>>>>       ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>>>>>>       throws IOException {
>>>>>>>>     Metadata metadata = c.element();
>>>>>>>>     logger.info(
>>>>>>>>         "reading {} with restriction {} @ {}",
>>>>>>>>         metadata,
>>>>>>>>         tracker.currentRestriction(),
>>>>>>>>         c.timestamp());
>>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>>     Instant timestamp = getTimestamp(filename);
>>>>>>>>     try (BufferedReader br = new BufferedReader(new 
>>>>>>>> FileReader(filename))) {
>>>>>>>>       String line;
>>>>>>>>       for (long lineNumber = 0; (line = br.readLine()) != null; 
>>>>>>>> ++lineNumber) {
>>>>>>>>         if (lineNumber < tracker.currentRestriction().getFrom()
>>>>>>>>             || lineNumber >= tracker.currentRestriction().getTo()) {
>>>>>>>>           continue;
>>>>>>>>         }
>>>>>>>>         if (!tracker.tryClaim(lineNumber)) {
>>>>>>>>           logger.info("failed to claim {}", lineNumber);
>>>>>>>>           return;
>>>>>>>>         }
>>>>>>>>         c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)), 
>>>>>>>> timestamp);
>>>>>>>>       }
>>>>>>>>     }
>>>>>>>>     logger.info("setting watermark to {}", timestamp);
>>>>>>>>     watermarkEstimator.setWatermark(timestamp);
>>>>>>>>     logger.info("Finish processing {} in file {}", 
>>>>>>>> tracker.currentRestriction(), filename);
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   private Instant getTimestamp(String filepath) {
>>>>>>>>     // Filename is assumed to be either yyyy-MM-dd or 
>>>>>>>> yyyy-MM-dd.complete.
>>>>>>>>     String filename = Paths.get(filepath).getFileName().toString();
>>>>>>>>     int index = filename.lastIndexOf(".complete");
>>>>>>>>     if (index != -1) {
>>>>>>>>       // In the case it has a suffix, strip it.
>>>>>>>>       filename = filename.substring(0, index);
>>>>>>>>     }
>>>>>>>>     Instant timestamp =
>>>>>>>>         Instant.parse(new 
>>>>>>>> StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>>>>>>     if (index != -1) {
>>>>>>>>       // In the case it has a suffix i.e. it is complete, fast forward 
>>>>>>>> to the next day.
>>>>>>>>       return timestamp.plus(Duration.standardDays(1));
>>>>>>>>     }
>>>>>>>>     return timestamp;
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   @GetInitialRestriction
>>>>>>>>   public OffsetRange getInitialRestriction(@Element Metadata metadata) 
>>>>>>>> throws IOException {
>>>>>>>>     long lineCount;
>>>>>>>>     try (Stream<String> stream = 
>>>>>>>> Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>>>>>>       lineCount = stream.count();
>>>>>>>>     }
>>>>>>>>     return new OffsetRange(0L, lineCount);
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   @GetInitialWatermarkEstimatorState
>>>>>>>>   public Instant getInitialWatermarkEstimatorState(
>>>>>>>>       @Element Metadata metadata, @Restriction OffsetRange 
>>>>>>>> restriction) {
>>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>>     logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>>>>>>     // Compute and return the initial watermark estimator state for 
>>>>>>>> each element and restriction.
>>>>>>>>     // All subsequent processing of an element and restriction will be 
>>>>>>>> restored from the existing
>>>>>>>>     // state.
>>>>>>>>     return getTimestamp(filename);
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   private static Instant ensureTimestampWithinBounds(Instant 
>>>>>>>> timestamp) {
>>>>>>>>     if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>>>>>>       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>>>>>>     } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>>>>>>       timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>>>>>>     }
>>>>>>>>     return timestamp;
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   @NewWatermarkEstimator
>>>>>>>>   public WatermarkEstimators.Manual newWatermarkEstimator(
>>>>>>>>       @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>>>>>>     logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>>>>>>     return new 
>>>>>>>> WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> I'm working on a blog post[1] about splittable dofns that covers
>>>>>>>>> this topic.
>>>>>>>>>
>>>>>>>>> The TLDR; is that FileIO.match() should allow users to control the
>>>>>>>>> watermark estimator that is used and for your use case you should 
>>>>>>>>> hold the
>>>>>>>>> watermark to some computable value (e.g. the files are generated 
>>>>>>>>> every hour
>>>>>>>>> so once you know the last file has appeared for that hour you advance 
>>>>>>>>> the
>>>>>>>>> watermark to the current hour).
>>>>>>>>>
>>>>>>>>> 1:
>>>>>>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>>>>>>
>>>>>>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I am looking into:
>>>>>>>>>> https://beam.apache.org/documentation/patterns/file-processing/
>>>>>>>>>> since I would like to create a continuous pipeline that reads from 
>>>>>>>>>> files
>>>>>>>>>> and assigns Event Times based on e.g. file metadata or actual data 
>>>>>>>>>> inside
>>>>>>>>>> the file. For example:
>>>>>>>>>>
>>>>>>>>>> private static void run(String[] args) {
>>>>>>>>>>   PipelineOptions options = 
>>>>>>>>>> PipelineOptionsFactory.fromArgs(args).create();
>>>>>>>>>>   Pipeline pipeline = Pipeline.create(options);
>>>>>>>>>>
>>>>>>>>>>   PCollection<Metadata> matches = pipeline
>>>>>>>>>>       .apply(FileIO.match()
>>>>>>>>>>           .filepattern("/tmp/input/*")
>>>>>>>>>>           .continuously(Duration.standardSeconds(15), 
>>>>>>>>>> Watch.Growth.never()));
>>>>>>>>>>   matches
>>>>>>>>>>       .apply(ParDo.of(new ReadFileFn()))
>>>>>>>>>>
>>>>>>>>>>   pipeline.run();
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, String> 
>>>>>>>>>> {
>>>>>>>>>>   private static final Logger logger = 
>>>>>>>>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>>>>
>>>>>>>>>>   @ProcessElement
>>>>>>>>>>   public void processElement(ProcessContext c) throws IOException {
>>>>>>>>>>     Metadata metadata = c.element();
>>>>>>>>>>     // I believe c.timestamp() is based on processing time.
>>>>>>>>>>     logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>>>>>>     String filename = metadata.resourceId().toString();
>>>>>>>>>>     // Output timestamps must be no earlier than the timestamp of the
>>>>>>>>>>     // current input minus the allowed skew (0 milliseconds).
>>>>>>>>>>     Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>>>>>>     logger.info("lastModified @ {}", timestamp);
>>>>>>>>>>     try (BufferedReader br = new BufferedReader(new 
>>>>>>>>>> FileReader(filename))) {
>>>>>>>>>>       String line;
>>>>>>>>>>       while ((line = br.readLine()) != null) {
>>>>>>>>>>         c.outputWithTimestamp(line, c.timestamp());
>>>>>>>>>>       }
>>>>>>>>>>     }
>>>>>>>>>>   }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> The issue is that when calling c.outputWithTimestamp() I am
>>>>>>>>>> getting:
>>>>>>>>>>
>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>>>>>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no 
>>>>>>>>>> earlier
>>>>>>>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z) 
>>>>>>>>>> minus
>>>>>>>>>> the allowed skew (0 milliseconds). See the 
>>>>>>>>>> DoFn#getAllowedTimestampSkew()
>>>>>>>>>> Javadoc for details on changing the allowed skew.
>>>>>>>>>>
>>>>>>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>>>>>>> the event time for the PCollection<Metadata>. I can see that the
>>>>>>>>>> call to continuously() makes the PCollection unbounded and
>>>>>>>>>> assigns default Event Time. Without the call to continuously() I can 
>>>>>>>>>> assign
>>>>>>>>>> the timestamps without problems either via c.outputWithTimestamp
>>>>>>>>>> or WithTimestamp transform.
>>>>>>>>>>
>>>>>>>>>> I would like to know what is the way to fix the issue, and
>>>>>>>>>> whether this use-case is currently supported in Beam.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards,
>>>>>>>>>> Piotr
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards,
>>>>>>>> Piotr
>>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>>
>> --
>> Best regards,
>> Piotr
>>
>

-- 
Best regards,
Piotr

Reply via email to