Hi, I’m having trouble setting up a pipeline that continuously reads from S3,
does a small transformation and then combines per key:
pipeline.apply(FileIO.match().filepattern(options.getDirectory() +
"/input/*").continuously(Duration.standardSeconds(10), Watch.Growth.never()))
.apply("Read", FileIO.readMatches())
.apply(TextIO.readFiles())
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(30))))
.apply(ParDo.of(new SplitFn()))
.apply(Combine.perKey(new CombinePerKeyFn()))
That works in the direct runner, but in Flink the watermark isn’t advancing..
I asked someone else to look at it and he swapped out the file read with this
code so he didn’t have to create the file:
pipeline
.apply(GenerateSequence.from(0).withRate(1,
Duration.standardSeconds(2)))
.apply(MapElements.into(strings()).via(i -> "test" +
ThreadLocalRandom.current().nextInt(10)));
Surprisingly, the pipeline started working. That doesn’t use a splittable DoFn
like the file read did, so we tried using a different splittable source and it
didn’t work in Flink either. All attempts have worked using the direct runner.
It seems like there’s something wrong in the splittable DoFn logic in the Flink
runner where the timestamp isn’t advancing. I’m fairly new to Beam and Flink
and am a little lost at the moment. Should I log a JIRA for this since it
seems like a defect? I plan on continuing to investigate, so any suggestions
for where to start looking would be greatly appreciated.
I setup an example that has both splittable and non-splittable sources (based
on the pipeline options) to make it easier to troubleshoot:
https://github.com/dneth/file-streaming-example/blob/master/src/main/java/Example.java
Thanks
CONFIDENTIALITY NOTICE This message and any included attachments are from
Cerner Corporation and are intended only for the addressee. The information
contained in this message is confidential and may constitute inside or
non-public information under international, federal, or state securities laws.
Unauthorized forwarding, printing, copying, distribution, or use of such
information is strictly prohibited and may be unlawful. If you are not the
addressee, please promptly delete this message and notify the sender of the
delivery error by e-mail or you may call Cerner's corporate offices in Kansas
City, Missouri, U.S.A at (+1) (816)221-1024.