Hi, I think this might not be a problem. The reason we have this DatSink(DiscardingOutputFormat) at the "end" of Flink Batch pipelines is that Flink Batch will not execute a chain of operations when they're not terminated by a sink. In Beam, it's just fine to just have a DoFn and no sink after that because the DoFn can also write data to an external system. In fact, the TextIO.write() operation was implemented as a combination of several DoFns, last time I checked. It would assume that this "terminator" sink is just waiting until the DoFns that do the actual work are done writing.
Could you maybe post the execution plan that you see in the Flink Dashboard? Either here or in private to me, if you don't want to share that on the ML. Best, Aljoscha > On 2. Aug 2017, at 17:02, Chris Hebert > <[email protected]> wrote: > > Hi, > > I ran a Beam+Flink+YARN job with many containers and a high > "parallelism.default" parameter in my conf/flink-conf.yaml file. > > That all worked perfectly with all the containers parallelizing all the parts > of the job up until the very end at a TextIO.write(). > > The last Task running was a "DataSink > (org.apache.flink.api.java.io.DiscardingOutputFormat)" during which I could > finally actually observe output being written to HDFS. (The pipeline was in > batch-mode reading from the file system, so I'm not entirely shocked writing > output was saved until the end). > > The problem is: This Task used only one TaskManger/Container and ran all by > itself for the last 15 minutes of the pipeline while all the other > TaskManagers/Containers sat idle. > > How can I make sure that this gets parallelized the same as all the other > Tasks? > > Should I address this through the Beam API or through some Flink > configuration parameter I haven't found yet? > > Is it even possible to have multiple TaskManagers writing the TextIO output > to HDFS at the same time? > > Thank you for your help, > Chris
