I've opened an issue: https://issues.apache.org/jira/browse/BEAM-365
2016-06-07 11:20 GMT+02:00 Aljoscha Krettek <[email protected]>: > At least for Flink a solution could be to set the parallelism of the > "WriteBundles" ParDo to the number of shards. Then the behavior would be > correct. Problem is just that the Flink runner (or any runner) doesn't know > about the special nature of these three ParDo functions. > > On Tue, 7 Jun 2016 at 01:16 Robert Bradshaw <[email protected]> wrote: > >> A preferable solution would be to augment the Beam sinks to support >> these parameters. At the very least, we should probably make running >> these with fixed shards a loud error in the meantime. >> >> On Wed, Jun 1, 2016 at 4:17 AM, Aljoscha Krettek <[email protected]> >> wrote: >> > Hi, >> > the issue is a bit more complicated and involves the Beam sink API and >> the >> > Flink runner. >> > >> > I'll have to get a bit into how Beam sinks work. The base class for >> sinks is >> > Sink (TextIO.write gets translated to Write.to(new TextSink())). >> Write.to >> > normally gets translated to three ParDo operations that cooperate to do >> the >> > writing: >> > >> > - "Initialize": this does initial initialization of the Sink, this is >> run >> > only once, per sink, non-parallel. >> > >> > - "WriteBundles": this gets an initialized sink on a side-input and the >> > values to write on the main input. This runs in parallel, so for Flink, >> if >> > you set parallelism=6 you'll get 6 parallel instances of this operation >> at >> > runtime. This operation forwards information about where it writes to >> > downstream. This does not write to the final file location but an >> > intermediate staging location. >> > >> > - "Finalize": This gets the initialized sink on the main-input and and >> the >> > information about written files from "WriteBundles" as a side-input. >> This >> > also only runs once, non-parallel. Here we're writing the intermediate >> files >> > to a final location based on the sharding template. >> > >> > The problem is that Write.to() and TextSink, as well as all other >> sinks, are >> > not aware of the number of shards. If you set "withoutSharding()" this >> will >> > set the shard template to "" (empty string) and the number of shards to >> 1. >> > "WriteBundles", however is not aware of this and will write 6 >> intermediate >> > files if you set parallelism=6. In "Finalize" we will copy an >> intermediate >> > file to the same final location 6 times based on the sharding template. >> The >> > end result is that you only get one of the six result shards. >> > >> > The reason why this does only occur in the Flink runner is that all >> other >> > runners have special overrides for TextIO.Write and AvroIO.Write that >> kick >> > in if sharding control is required. So, for the time being this is a >> Flink >> > runner bug and we might have to introduce special overrides as well >> until >> > this is solved in the general case. >> > >> > Cheers, >> > Aljoscha >> > >> > On Wed, 1 Jun 2016 at 07:37 Jean-Baptiste Onofré <[email protected]> >> wrote: >> >> >> >> Yes, just tested, it happens only with the flink runner. >> >> >> >> Agree to create a Jira. >> >> >> >> Regards >> >> JB >> >> >> >> On 06/01/2016 03:41 AM, Davor Bonaci wrote: >> >> > This will be a runner-specific issue. It would be the best to file a >> >> > JIRA issue for this. >> >> > >> >> > On Tue, May 31, 2016 at 9:46 AM, Jean-Baptiste Onofré < >> [email protected] >> >> > <mailto:[email protected]>> wrote: >> >> > >> >> > Hi Pawel, >> >> > >> >> > does it happen only with the Flink runner ? I bet it happens with >> >> > any runner. >> >> > >> >> > Let me take a look. >> >> > >> >> > Regards >> >> > JB >> >> > >> >> > On 05/30/2016 01:38 AM, Pawel Szczur wrote: >> >> > >> >> > Hi, >> >> > >> >> > I'm running a pipeline with Flink backend, Beam bleeding >> edge, >> >> > Oracle >> >> > Java 1.8, maven 3.3.3, linux64. >> >> > >> >> > The pipeline is run with --parallelism=6. >> >> > >> >> > Adding .withoutSharding()causes a TextIO sink to write only >> one >> >> > of the >> >> > shards. >> >> > >> >> > Example use: >> >> > >> >> > >> data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")); >> >> > vs. >> >> > >> >> > >> data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")*.withoutSharding()*); >> >> > >> >> > Result: >> >> > Only part of data is written to file. After comparing to >> sharded >> >> > output, >> >> > it seems to be just one of shard files. >> >> > >> >> > Cheers, >> >> > Pawel >> >> > >> >> > >> >> > -- >> >> > Jean-Baptiste Onofré >> >> > [email protected] <mailto:[email protected]> >> >> > http://blog.nanthrax.net >> >> > Talend - http://www.talend.com >> >> > >> >> > >> >> >> >> -- >> >> Jean-Baptiste Onofré >> >> [email protected] >> >> http://blog.nanthrax.net >> >> Talend - http://www.talend.com >> >
