Hi,

I think this might not be a problem. The reason we have this 
DatSink(DiscardingOutputFormat) at the "end" of Flink Batch pipelines is that 
Flink Batch will not execute a chain of operations when they're not terminated 
by a sink. In Beam, it's just fine to just have a DoFn and no sink after that 
because the DoFn can also write data to an external system. In fact, the 
TextIO.write() operation was implemented as a combination of several DoFns, 
last time I checked. It would assume that this "terminator" sink is just 
waiting until the DoFns that do the actual work are done writing. 

Could you maybe post the execution plan that you see in the Flink Dashboard? 
Either here or in private to me, if you don't want to share that on the ML.

Best,
Aljoscha

> On 2. Aug 2017, at 17:02, Chris Hebert 
> <[email protected]> wrote:
> 
> Hi,
> 
> I ran a Beam+Flink+YARN job with many containers and a high 
> "parallelism.default" parameter in my conf/flink-conf.yaml file.
> 
> That all worked perfectly with all the containers parallelizing all the parts 
> of the job up until the very end at a TextIO.write().
> 
> The last Task running was a "DataSink 
> (org.apache.flink.api.java.io.DiscardingOutputFormat)" during which I could 
> finally actually observe output being written to HDFS. (The pipeline was in 
> batch-mode reading from the file system, so I'm not entirely shocked writing 
> output was saved until the end).
> 
> The problem is: This Task used only one TaskManger/Container and ran all by 
> itself for the last 15 minutes of the pipeline while all the other 
> TaskManagers/Containers sat idle.
> 
> How can I make sure that this gets parallelized the same as all the other 
> Tasks?
> 
> Should I address this through the Beam API or through some Flink 
> configuration parameter I haven't found yet?
> 
> Is it even possible to have multiple TaskManagers writing the TextIO output 
> to HDFS at the same time?
> 
> Thank you for your help,
> Chris

Reply via email to