Here is the simplified code that works e2e:
static class GetData extends PTransform<PBegin, PCollection<KV<String, Long>>> {
private final String filepattern;
private final Duration pollInterval;
private final TerminationCondition terminationCondition;
GetData(String filepattern, Duration pollInterval,
TerminationCondition terminationCondition) {
this.filepattern = filepattern;
this.pollInterval = pollInterval;
this.terminationCondition = terminationCondition;
}
@Override
public PCollection<KV<String, Long>> expand(PBegin input) {
final Growth<String, MatchResult.Metadata, MatchResult.Metadata>
stringMetadataStringGrowth =
Watch.growthOf(new MatchPollFn())
.withPollInterval(pollInterval)
.withTerminationPerInput(terminationCondition);
return input
.apply("Create filepattern", Create.<String>of(filepattern))
.apply("Continuously match filepatterns", stringMetadataStringGrowth)
.apply(Values.create())
.apply(ParDo.of(new ReadFileFn()));
}
}
/**
* Outputs hardcoded values or nothing if file name ends with
".completed". The event times and
* watermarks are unchanged
*/
private static final class ReadFileFn extends DoFn<Metadata, KV<String, Long>> {
@ProcessElement
public void processElement(ProcessContext c) {
Metadata metadata = c.element();
String filename = metadata.resourceId().toString();
if (filename.endsWith(".complete")) {
return;
}
c.output(KV.of("my-key", 1L));
c.output(KV.of("my-key", 2L));
}
}
/**
* Matches input filepattern and outputs matched file {@code
Metadata}. The timestamps of the
* values outputted are based on the filenames, which are assumed to
be either yyyy-MM-dd or
* yyyy-MM-dd.complete. The watermark is set to the maximum of all the
outputted event timestamps.
*/
private static class MatchPollFn extends PollFn<String, Metadata> {
private static final Logger logger =
LoggerFactory.getLogger(MatchPollFn.class);
@Override
public Watch.Growth.PollResult<MatchResult.Metadata> apply(String
element, Context c)
throws Exception {
final List<Metadata> metadataList =
FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata();
List<TimestampedValue<Metadata>> outputs = new LinkedList<>();
Instant watermark = Instant.EPOCH;
for (Metadata metadata : metadataList) {
String filename = metadata.resourceId().toString();
final Instant timestamp = getTimestamp(filename);
outputs.add(TimestampedValue.of(metadata, timestamp));
if (timestamp.isAfter(watermark)) {
watermark = timestamp;
}
}
logger.info("outputting watermark {}", watermark);
return Watch.Growth.PollResult.incomplete(outputs).withWatermark(watermark);
}
}
/**
* Returns {@link Instant} based on the {@code filepath}. Filename is
assumed to be either
* yyyy-MM-dd or yyyy-MM-dd.complete. The returned value is
"yyyy-MM-ddT00:00:00.000Z" for
* yyyy-MM-dd and "yyyy-MM-ddT00:00:00.000Z"+24h for yyyy-MM-dd.complete.
*/
public static Instant getTimestamp(String filepath) {
String filename = Paths.get(filepath).getFileName().toString();
int index = filename.lastIndexOf(".complete");
if (index != -1) {
// In the case it has a suffix, strip it.
filename = filename.substring(0, index);
}
Instant timestamp = Instant.parse(filename, dateTimeFormatter);
if (index != -1) {
// In the case it has a suffix i.e. it is complete, fast forward
to the next day.
return timestamp.plus(Duration.standardDays(1));
}
return timestamp;
}
On Mon, Oct 19, 2020 at 9:56 AM Luke Cwik <[email protected]> wrote:
> For future reference, what did you have to change to get it to work?
>
> On Thu, Oct 15, 2020 at 2:40 PM Piotr Filipiuk <[email protected]>
> wrote:
>
>> Made it work e2e. Thank you all for the help!
>>
>> On Wed, Oct 14, 2020 at 3:48 PM Piotr Filipiuk <[email protected]>
>> wrote:
>>
>>> Got it, thank you for the clarification.
>>>
>>> I tried to run the pipeline locally, with the following main (see full
>>> source code attached):
>>>
>>> public static void main(String[] args) {
>>> PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
>>> Pipeline pipeline = Pipeline.create(options);
>>> logger.info("running");
>>>
>>> PCollection<KV<String, Long>> output =
>>> FileProcessing.getData(
>>> pipeline, "/tmp/input/*", Duration.standardSeconds(1),
>>> Growth.never());
>>>
>>> output
>>> .apply("Window",
>>> Window.into(FixedWindows.of(Duration.standardDays(1))))
>>> .apply("LogWindowed", Log.ofElements("testGetData"))
>>> .apply(Sum.longsPerKey())
>>> .apply(
>>> "FormatResults",
>>> MapElements.into(TypeDescriptors.strings())
>>> .via((KV<String, Long> kv) -> String.format("{},{}",
>>> kv.getKey(), kv.getValue())))
>>> .apply("LogResults", Log.ofElements("results"))
>>> .apply(
>>> TextIO.write()
>>> .to(Paths.get("/tmp/output/").resolve("Results").toString())
>>> .withWindowedWrites()
>>> .withNumShards(1));
>>>
>>> pipeline.run();
>>> }
>>>
>>>
>>> Then I am generating files using:
>>>
>>> for i in {01..30}; do echo "handling $i"; echo "1\n2\n3\n4" >
>>> /tmp/input/1985-10-$i; sleep 2; touch /tmp/input/1985-10-$i.complete; sleep
>>> 2; done
>>>
>>> I do not see any outputs being generated though. Can you elaborate why
>>> that might be? I would suspect that once the watermark is set to day+1, the
>>> results of the previous day should be finalized and hence the result for a
>>> given window should be outputted.
>>>
>>> On Wed, Oct 14, 2020 at 1:41 PM Luke Cwik <[email protected]> wrote:
>>>
>>>> I think you should be using the largest "complete" timestamp from the
>>>> metadata results and not be setting the watermark if you don't have one.
>>>>
>>>> On Wed, Oct 14, 2020 at 11:47 AM Piotr Filipiuk <
>>>> [email protected]> wrote:
>>>>
>>>>> Thank you so much for the input, that was extremely helpful!
>>>>>
>>>>> I changed the pipeline from using FileIO.match() into using a custom
>>>>> matching (very similar to the FileIO.match()) that looks as follows:
>>>>>
>>>>> static PCollection<KV<String, Long>> getData(
>>>>> Pipeline pipeline,
>>>>> String filepattern,
>>>>> Duration pollInterval,
>>>>> TerminationCondition terminationCondition) {
>>>>> final Growth<String, MatchResult.Metadata, String>
>>>>> stringMetadataStringGrowth =
>>>>> Watch.growthOf(
>>>>> Contextful.of(new MatchPollFn(), Requirements.empty()), new
>>>>> ExtractFilenameFn())
>>>>> .withPollInterval(pollInterval)
>>>>> .withTerminationPerInput(terminationCondition);
>>>>> return pipeline
>>>>> .apply("Create filepattern", Create.<String>of(filepattern))
>>>>> .apply("Continuously match filepatterns",
>>>>> stringMetadataStringGrowth)
>>>>> .apply(Values.create())
>>>>> .apply(ParDo.of(new ReadFileFn()));
>>>>> }
>>>>>
>>>>> private static class MatchPollFn extends PollFn<String, Metadata> {
>>>>> private static final Logger logger =
>>>>> LoggerFactory.getLogger(MatchPollFn.class);
>>>>>
>>>>> @Override
>>>>> public Watch.Growth.PollResult<MatchResult.Metadata> apply(String
>>>>> element, Context c)
>>>>> throws Exception {
>>>>> // Here we only have the filepattern i.e. element, and hence we do
>>>>> not know what the timestamp
>>>>> // and/or watermark should be. As a result, we output EPOCH as both
>>>>> the timestamp and the
>>>>> // watermark.
>>>>> Instant instant = Instant.EPOCH;
>>>>> return Watch.Growth.PollResult.incomplete(
>>>>> instant, FileSystems.match(element,
>>>>> EmptyMatchTreatment.ALLOW).metadata())
>>>>> .withWatermark(instant);
>>>>> }
>>>>> }
>>>>>
>>>>> private static class ExtractFilenameFn implements
>>>>> SerializableFunction<Metadata, String> {
>>>>> @Override
>>>>> public String apply(MatchResult.Metadata input) {
>>>>> return input.resourceId().toString();
>>>>> }
>>>>> }
>>>>>
>>>>> The above together with fixing the bugs that Luke pointed out (Thank
>>>>> you Luke!), makes the unit test pass.
>>>>>
>>>>> Thank you again!
>>>>>
>>>>> If you have any feedback for the current code, I would appreciate it.
>>>>> I am especially interested whether setting event time and watermark in
>>>>> *MatchPollFn* to *EPOCH* is a correct way to go.
>>>>>
>>>>>
>>>>> On Wed, Oct 14, 2020 at 9:49 AM Reuven Lax <[email protected]> wrote:
>>>>>
>>>>>> FYI this is a major limitation in FileIO.match's watermarking
>>>>>> ability. I believe there is a JIRA issue about this, but nobody has ever
>>>>>> worked on improving it.
>>>>>>
>>>>>> On Wed, Oct 14, 2020 at 9:38 AM Luke Cwik <[email protected]> wrote:
>>>>>>
>>>>>>> FileIO.match doesn't allow one to configure how the watermark
>>>>>>> advances and it assumes that the watermark during polling is always the
>>>>>>> current system time[1].
>>>>>>>
>>>>>>> Because of this the downstream watermark advancement is limited.
>>>>>>> When an element and restriction starts processing, the maximum you can
>>>>>>> hold
>>>>>>> the output watermark back by for this element and restriction pair is
>>>>>>> limited to the current input watermark (a common value to use is the
>>>>>>> current element's timestamp as the lower bound for all future output
>>>>>>> but if
>>>>>>> that element is late the output you produce may or may not be late
>>>>>>> (depends
>>>>>>> on downstream windowing strategy)). Holding this watermark back is
>>>>>>> important since many of these elements and restrictions could be
>>>>>>> processed
>>>>>>> in parallel at different rates.
>>>>>>>
>>>>>>> Based upon your implementation, you wouldn't need to control the
>>>>>>> watermark from the file reading splittable DoFn if FileIO.match allowed
>>>>>>> you
>>>>>>> to say what the watermark is after each polling round and allowed you to
>>>>>>> set the timestamp for each match found. This initial setting of the
>>>>>>> watermark during polling would be properly handled by the runner to
>>>>>>> block
>>>>>>> watermark advancement for those elements.
>>>>>>>
>>>>>>> Minor comments not related to your issue but would improve your
>>>>>>> implementation:
>>>>>>> 1) Typically you set the watermark right before returning. You are
>>>>>>> missing this from the failed tryClaim loop return.
>>>>>>> 2) You should structure your loop not based upon the end of the
>>>>>>> current restriction but continue processing till tryClaim fails. For
>>>>>>> example:
>>>>>>> @ProcessElement
>>>>>>> public void processElement(@Element String fileName,
>>>>>>> RestrictionTracker<OffsetRange, Long> tracker, OutputReceiver<Integer>
>>>>>>> outputReceiver) throws IOException {
>>>>>>> RandomAccessFile file = new RandomAccessFile(fileName, "r");
>>>>>>> seekToNextRecordBoundaryInFile(file,
>>>>>>> tracker.currentRestriction().getFrom());
>>>>>>> while (tracker.tryClaim(file.getFilePointer())) {
>>>>>>> outputReceiver.output(readNextRecord(file));
>>>>>>> }
>>>>>>> }
>>>>>>> 3) ensureTimestampWithinBounds is dangerous as you're masking a
>>>>>>> possible data issue since the code either parsed some filename
>>>>>>> incorrectly.
>>>>>>> It is likely that you copied this from Beam code and it is used there
>>>>>>> because user implementations of UnboundedSource were incorrectly setting
>>>>>>> the watermark outside of the bounds and there is no way to fix them.
>>>>>>>
>>>>>>> 1:
>>>>>>> https://github.com/apache/beam/blob/29787b38b594e29428adaf230b45f9b33e24fa66/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L666
>>>>>>>
>>>>>>> On Tue, Oct 13, 2020 at 6:04 PM Piotr Filipiuk <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Thank you for a quick response. I tried to follow the doc attached
>>>>>>>> and read existing Beam code that uses the Splittable DoFns, and I made
>>>>>>>> some
>>>>>>>> progress.
>>>>>>>>
>>>>>>>> I created a simple pipeline that matches given filepattern, and
>>>>>>>> uses splittable dofn to control event times and watermarks. The
>>>>>>>> pipeline
>>>>>>>> expects files with the following name patterns:
>>>>>>>>
>>>>>>>> *yyyy-MM-dd*
>>>>>>>>
>>>>>>>> *yyyy-MM-dd.complete*
>>>>>>>>
>>>>>>>> Every time it sees *yyyy-MM-dd*, it reads its contents and outputs
>>>>>>>> lines of the file using *outputWithTimestamp(..., timestamp)*.
>>>>>>>> Additionally, it calls *watermarkEstimator.setWatermark(timestamp)*.
>>>>>>>> In both cases the timestamp is *yyyy-MM-ddT00:00:00.000Z*.
>>>>>>>>
>>>>>>>> Once the pipeline matches *yyyy-MM-dd.complete* (which is empty)
>>>>>>>> it calls *watermarkEstimator.setWatermark(timestamp)*, where
>>>>>>>> timestamp is *yyyy-MM-ddT00:00:00.000Z plus one day* - hence it
>>>>>>>> advances to the next day.
>>>>>>>>
>>>>>>>> I am at the point when the following unit test fails the inWindow()
>>>>>>>> assertions, the last assertion passes. It seems that even though I
>>>>>>>> call watermarkEstimator.setWatermark() the window is not being closed.
>>>>>>>>
>>>>>>>> I would appreciate help/suggestions on what I am missing.
>>>>>>>>
>>>>>>>> Here is a unit test. The function being tested is getData() defined
>>>>>>>> below.
>>>>>>>>
>>>>>>>> public void testGetDataWithNewFiles() throws InterruptedException {
>>>>>>>> final Duration duration = Duration.standardDays(1);
>>>>>>>>
>>>>>>>> IntervalWindow firstWindow =
>>>>>>>> new IntervalWindow(Instant.parse("2020-01-01T00:00:00.000Z"),
>>>>>>>> duration);
>>>>>>>> logger.info("first window {}", firstWindow);
>>>>>>>> IntervalWindow secondWindow =
>>>>>>>> new IntervalWindow(Instant.parse("2020-01-02T00:00:00.000Z"),
>>>>>>>> duration);
>>>>>>>> logger.info("second window {}", secondWindow);
>>>>>>>>
>>>>>>>> MatchConfiguration matchConfiguration =
>>>>>>>> MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)
>>>>>>>> .continuously(
>>>>>>>> Duration.millis(100),
>>>>>>>>
>>>>>>>> Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(5)));
>>>>>>>>
>>>>>>>> PCollection<KV<String, Long>> output =
>>>>>>>> FileProcessing.getData(p, tmpFolder.getRoot().getAbsolutePath()
>>>>>>>> + "/*", matchConfiguration)
>>>>>>>> .apply("Window", Window.into(FixedWindows.of(duration)))
>>>>>>>> .apply("LogWindowedResult", Log.ofElements("testGetData"));
>>>>>>>>
>>>>>>>> assertEquals(PCollection.IsBounded.UNBOUNDED, output.isBounded());
>>>>>>>>
>>>>>>>> Thread writer =
>>>>>>>> new Thread(
>>>>>>>> () -> {
>>>>>>>> try {
>>>>>>>> Thread.sleep(1000);
>>>>>>>>
>>>>>>>> Path firstPath =
>>>>>>>> tmpFolder.newFile("2020-01-01").toPath();
>>>>>>>> Files.write(firstPath, Arrays.asList("1", "2", "3"));
>>>>>>>>
>>>>>>>> Thread.sleep(1000);
>>>>>>>>
>>>>>>>> Path firstPathComplete =
>>>>>>>> tmpFolder.newFile("2020-01-01.complete").toPath();
>>>>>>>> Files.write(firstPathComplete, Arrays.asList());
>>>>>>>>
>>>>>>>> Thread.sleep(1000);
>>>>>>>>
>>>>>>>> Path secondPath =
>>>>>>>> tmpFolder.newFile("2020-01-02").toPath();
>>>>>>>> Files.write(secondPath, Arrays.asList("4", "5", "6"));
>>>>>>>>
>>>>>>>> Thread.sleep(1000);
>>>>>>>>
>>>>>>>> Path secondPathComplete =
>>>>>>>> tmpFolder.newFile("2020-01-02.complete").toPath();
>>>>>>>> Files.write(secondPathComplete, Arrays.asList());
>>>>>>>>
>>>>>>>> } catch (IOException | InterruptedException e) {
>>>>>>>> throw new RuntimeException(e);
>>>>>>>> }
>>>>>>>> });
>>>>>>>> writer.start();
>>>>>>>>
>>>>>>>> // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>>> PAssert.that(output)
>>>>>>>> .inWindow(firstWindow)
>>>>>>>> .containsInAnyOrder(KV.of("my-key", 1L), KV.of("my-key", 2L),
>>>>>>>> KV.of("my-key", 3L));
>>>>>>>>
>>>>>>>> // THIS ASSERTION FAILS, THERE ARE NO ELEMENTS IN THIS WINDOW.
>>>>>>>> PAssert.that(output)
>>>>>>>> .inWindow(secondWindow)
>>>>>>>> .containsInAnyOrder(KV.of("my-key", 4L), KV.of("my-key", 5L),
>>>>>>>> KV.of("my-key", 6L));
>>>>>>>>
>>>>>>>> // THIS ASSERTION PASSES.
>>>>>>>> PAssert.that(output)
>>>>>>>> .containsInAnyOrder(
>>>>>>>> KV.of("my-key", 1L),
>>>>>>>> KV.of("my-key", 2L),
>>>>>>>> KV.of("my-key", 3L),
>>>>>>>> KV.of("my-key", 4L),
>>>>>>>> KV.of("my-key", 5L),
>>>>>>>> KV.of("my-key", 6L));
>>>>>>>>
>>>>>>>> p.run();
>>>>>>>>
>>>>>>>> writer.join();
>>>>>>>> }
>>>>>>>>
>>>>>>>> Here is the code. Essentially, I am using *FileIO.match()* to
>>>>>>>> match filepattern. Then the file *Metadata* is processed by my
>>>>>>>> custom Splittable DoFn.
>>>>>>>>
>>>>>>>> static PCollection<KV<String, Long>> getData(
>>>>>>>> Pipeline pipeline, String filepattern, MatchConfiguration
>>>>>>>> matchConfiguration) {
>>>>>>>> PCollection<Metadata> matches =
>>>>>>>> pipeline.apply(
>>>>>>>>
>>>>>>>> FileIO.match().filepattern(filepattern).withConfiguration(matchConfiguration));
>>>>>>>> return matches.apply(ParDo.of(new
>>>>>>>> ReadFileFn())).apply(Log.ofElements("Get Data"));
>>>>>>>> }
>>>>>>>>
>>>>>>>> /**
>>>>>>>> * Processes matched files by outputting key-value pairs where key is
>>>>>>>> equal to "my-key" and values
>>>>>>>> * are Long values corresponding to the lines in the file. In the case
>>>>>>>> file does not contain one
>>>>>>>> * Long per line, IOException is thrown.
>>>>>>>> */
>>>>>>>> @DoFn.BoundedPerElement
>>>>>>>> private static final class ReadFileFn extends DoFn<Metadata,
>>>>>>>> KV<String, Long>> {
>>>>>>>> private static final Logger logger =
>>>>>>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>>
>>>>>>>> @ProcessElement
>>>>>>>> public void processElement(
>>>>>>>> ProcessContext c,
>>>>>>>> RestrictionTracker<OffsetRange, Long> tracker,
>>>>>>>> ManualWatermarkEstimator<Instant> watermarkEstimator)
>>>>>>>> throws IOException {
>>>>>>>> Metadata metadata = c.element();
>>>>>>>> logger.info(
>>>>>>>> "reading {} with restriction {} @ {}",
>>>>>>>> metadata,
>>>>>>>> tracker.currentRestriction(),
>>>>>>>> c.timestamp());
>>>>>>>> String filename = metadata.resourceId().toString();
>>>>>>>> Instant timestamp = getTimestamp(filename);
>>>>>>>> try (BufferedReader br = new BufferedReader(new
>>>>>>>> FileReader(filename))) {
>>>>>>>> String line;
>>>>>>>> for (long lineNumber = 0; (line = br.readLine()) != null;
>>>>>>>> ++lineNumber) {
>>>>>>>> if (lineNumber < tracker.currentRestriction().getFrom()
>>>>>>>> || lineNumber >= tracker.currentRestriction().getTo()) {
>>>>>>>> continue;
>>>>>>>> }
>>>>>>>> if (!tracker.tryClaim(lineNumber)) {
>>>>>>>> logger.info("failed to claim {}", lineNumber);
>>>>>>>> return;
>>>>>>>> }
>>>>>>>> c.outputWithTimestamp(KV.of("my-key", Long.parseLong(line)),
>>>>>>>> timestamp);
>>>>>>>> }
>>>>>>>> }
>>>>>>>> logger.info("setting watermark to {}", timestamp);
>>>>>>>> watermarkEstimator.setWatermark(timestamp);
>>>>>>>> logger.info("Finish processing {} in file {}",
>>>>>>>> tracker.currentRestriction(), filename);
>>>>>>>> }
>>>>>>>>
>>>>>>>> private Instant getTimestamp(String filepath) {
>>>>>>>> // Filename is assumed to be either yyyy-MM-dd or
>>>>>>>> yyyy-MM-dd.complete.
>>>>>>>> String filename = Paths.get(filepath).getFileName().toString();
>>>>>>>> int index = filename.lastIndexOf(".complete");
>>>>>>>> if (index != -1) {
>>>>>>>> // In the case it has a suffix, strip it.
>>>>>>>> filename = filename.substring(0, index);
>>>>>>>> }
>>>>>>>> Instant timestamp =
>>>>>>>> Instant.parse(new
>>>>>>>> StringBuilder().append(filename).append("T00:00:00.000Z").toString());
>>>>>>>> if (index != -1) {
>>>>>>>> // In the case it has a suffix i.e. it is complete, fast forward
>>>>>>>> to the next day.
>>>>>>>> return timestamp.plus(Duration.standardDays(1));
>>>>>>>> }
>>>>>>>> return timestamp;
>>>>>>>> }
>>>>>>>>
>>>>>>>> @GetInitialRestriction
>>>>>>>> public OffsetRange getInitialRestriction(@Element Metadata metadata)
>>>>>>>> throws IOException {
>>>>>>>> long lineCount;
>>>>>>>> try (Stream<String> stream =
>>>>>>>> Files.lines(Paths.get(metadata.resourceId().toString()))) {
>>>>>>>> lineCount = stream.count();
>>>>>>>> }
>>>>>>>> return new OffsetRange(0L, lineCount);
>>>>>>>> }
>>>>>>>>
>>>>>>>> @GetInitialWatermarkEstimatorState
>>>>>>>> public Instant getInitialWatermarkEstimatorState(
>>>>>>>> @Element Metadata metadata, @Restriction OffsetRange
>>>>>>>> restriction) {
>>>>>>>> String filename = metadata.resourceId().toString();
>>>>>>>> logger.info("getInitialWatermarkEstimatorState {}", filename);
>>>>>>>> // Compute and return the initial watermark estimator state for
>>>>>>>> each element and restriction.
>>>>>>>> // All subsequent processing of an element and restriction will be
>>>>>>>> restored from the existing
>>>>>>>> // state.
>>>>>>>> return getTimestamp(filename);
>>>>>>>> }
>>>>>>>>
>>>>>>>> private static Instant ensureTimestampWithinBounds(Instant
>>>>>>>> timestamp) {
>>>>>>>> if (timestamp.isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
>>>>>>>> timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
>>>>>>>> } else if (timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
>>>>>>>> timestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
>>>>>>>> }
>>>>>>>> return timestamp;
>>>>>>>> }
>>>>>>>>
>>>>>>>> @NewWatermarkEstimator
>>>>>>>> public WatermarkEstimators.Manual newWatermarkEstimator(
>>>>>>>> @WatermarkEstimatorState Instant watermarkEstimatorState) {
>>>>>>>> logger.info("newWatermarkEstimator {}", watermarkEstimatorState);
>>>>>>>> return new
>>>>>>>> WatermarkEstimators.Manual(ensureTimestampWithinBounds(watermarkEstimatorState));
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 8, 2020 at 2:15 PM Luke Cwik <[email protected]> wrote:
>>>>>>>>
>>>>>>>>> I'm working on a blog post[1] about splittable dofns that covers
>>>>>>>>> this topic.
>>>>>>>>>
>>>>>>>>> The TLDR; is that FileIO.match() should allow users to control the
>>>>>>>>> watermark estimator that is used and for your use case you should
>>>>>>>>> hold the
>>>>>>>>> watermark to some computable value (e.g. the files are generated
>>>>>>>>> every hour
>>>>>>>>> so once you know the last file has appeared for that hour you advance
>>>>>>>>> the
>>>>>>>>> watermark to the current hour).
>>>>>>>>>
>>>>>>>>> 1:
>>>>>>>>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#heading=h.fo3wm9qs0vql
>>>>>>>>>
>>>>>>>>> On Thu, Oct 8, 2020 at 1:55 PM Piotr Filipiuk <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I am looking into:
>>>>>>>>>> https://beam.apache.org/documentation/patterns/file-processing/
>>>>>>>>>> since I would like to create a continuous pipeline that reads from
>>>>>>>>>> files
>>>>>>>>>> and assigns Event Times based on e.g. file metadata or actual data
>>>>>>>>>> inside
>>>>>>>>>> the file. For example:
>>>>>>>>>>
>>>>>>>>>> private static void run(String[] args) {
>>>>>>>>>> PipelineOptions options =
>>>>>>>>>> PipelineOptionsFactory.fromArgs(args).create();
>>>>>>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>>>>>>
>>>>>>>>>> PCollection<Metadata> matches = pipeline
>>>>>>>>>> .apply(FileIO.match()
>>>>>>>>>> .filepattern("/tmp/input/*")
>>>>>>>>>> .continuously(Duration.standardSeconds(15),
>>>>>>>>>> Watch.Growth.never()));
>>>>>>>>>> matches
>>>>>>>>>> .apply(ParDo.of(new ReadFileFn()))
>>>>>>>>>>
>>>>>>>>>> pipeline.run();
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> private static final class ReadFileFn extends DoFn<Metadata, String>
>>>>>>>>>> {
>>>>>>>>>> private static final Logger logger =
>>>>>>>>>> LoggerFactory.getLogger(ReadFileFn.class);
>>>>>>>>>>
>>>>>>>>>> @ProcessElement
>>>>>>>>>> public void processElement(ProcessContext c) throws IOException {
>>>>>>>>>> Metadata metadata = c.element();
>>>>>>>>>> // I believe c.timestamp() is based on processing time.
>>>>>>>>>> logger.info("reading {} @ {}", metadata, c.timestamp());
>>>>>>>>>> String filename = metadata.resourceId().toString();
>>>>>>>>>> // Output timestamps must be no earlier than the timestamp of the
>>>>>>>>>> // current input minus the allowed skew (0 milliseconds).
>>>>>>>>>> Instant timestamp = new Instant(metadata.lastModifiedMillis());
>>>>>>>>>> logger.info("lastModified @ {}", timestamp);
>>>>>>>>>> try (BufferedReader br = new BufferedReader(new
>>>>>>>>>> FileReader(filename))) {
>>>>>>>>>> String line;
>>>>>>>>>> while ((line = br.readLine()) != null) {
>>>>>>>>>> c.outputWithTimestamp(line, c.timestamp());
>>>>>>>>>> }
>>>>>>>>>> }
>>>>>>>>>> }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> The issue is that when calling c.outputWithTimestamp() I am
>>>>>>>>>> getting:
>>>>>>>>>>
>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Cannot output with
>>>>>>>>>> timestamp 1970-01-01T00:00:00.000Z. Output timestamps must be no
>>>>>>>>>> earlier
>>>>>>>>>> than the timestamp of the current input (2020-10-08T20:39:44.286Z)
>>>>>>>>>> minus
>>>>>>>>>> the allowed skew (0 milliseconds). See the
>>>>>>>>>> DoFn#getAllowedTimestampSkew()
>>>>>>>>>> Javadoc for details on changing the allowed skew.
>>>>>>>>>>
>>>>>>>>>> I believe this is because MatchPollFn.apply() uses Instant.now() as
>>>>>>>>>> the event time for the PCollection<Metadata>. I can see that the
>>>>>>>>>> call to continuously() makes the PCollection unbounded and
>>>>>>>>>> assigns default Event Time. Without the call to continuously() I can
>>>>>>>>>> assign
>>>>>>>>>> the timestamps without problems either via c.outputWithTimestamp
>>>>>>>>>> or WithTimestamp transform.
>>>>>>>>>>
>>>>>>>>>> I would like to know what is the way to fix the issue, and
>>>>>>>>>> whether this use-case is currently supported in Beam.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards,
>>>>>>>>>> Piotr
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards,
>>>>>>>> Piotr
>>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> Piotr
>>>>>
>>>>
>>>
>>> --
>>> Best regards,
>>> Piotr
>>>
>>
>>
>> --
>> Best regards,
>> Piotr
>>
>
--
Best regards,
Piotr