I've opened an issue:
https://issues.apache.org/jira/browse/BEAM-365

2016-06-07 11:20 GMT+02:00 Aljoscha Krettek <[email protected]>:

> At least for Flink a solution could be to set the parallelism of the
> "WriteBundles" ParDo to the number of shards. Then the behavior would be
> correct. Problem is just that the Flink runner (or any runner) doesn't know
> about the special nature of these three ParDo functions.
>
> On Tue, 7 Jun 2016 at 01:16 Robert Bradshaw <[email protected]> wrote:
>
>> A preferable solution would be to augment the Beam sinks to support
>> these parameters. At the very least, we should probably make running
>> these with fixed shards a loud error in the meantime.
>>
>> On Wed, Jun 1, 2016 at 4:17 AM, Aljoscha Krettek <[email protected]>
>> wrote:
>> > Hi,
>> > the issue is a bit more complicated and involves the Beam sink API and
>> the
>> > Flink runner.
>> >
>> > I'll have to get a bit into how Beam sinks work. The base class for
>> sinks is
>> > Sink (TextIO.write gets translated to Write.to(new TextSink())).
>> Write.to
>> > normally gets translated to three ParDo operations that cooperate to do
>> the
>> > writing:
>> >
>> >  - "Initialize": this does initial initialization of the Sink, this is
>> run
>> > only once, per sink, non-parallel.
>> >
>> >  - "WriteBundles": this gets an initialized sink on a side-input and the
>> > values to write on the main input. This runs in parallel, so for Flink,
>> if
>> > you set parallelism=6 you'll get 6 parallel instances of this operation
>> at
>> > runtime. This operation forwards information about where it writes to
>> > downstream. This does not write to the final file location but an
>> > intermediate staging location.
>> >
>> >  - "Finalize": This gets the initialized sink on the main-input and and
>> the
>> > information about written files from "WriteBundles" as a side-input.
>> This
>> > also only runs once, non-parallel. Here we're writing the intermediate
>> files
>> > to a final location based on the sharding template.
>> >
>> > The problem is that Write.to() and TextSink, as well as all other
>> sinks, are
>> > not aware of the number of shards. If you set "withoutSharding()" this
>> will
>> > set the shard template to "" (empty string) and the number of shards to
>> 1.
>> > "WriteBundles", however is not aware of this and will write 6
>> intermediate
>> > files if you set parallelism=6. In "Finalize" we will copy an
>> intermediate
>> > file to the same final location 6 times based on the sharding template.
>> The
>> > end result is that you only get one of the six result shards.
>> >
>> > The reason why this does only occur in the Flink runner is that all
>> other
>> > runners have special overrides for TextIO.Write and AvroIO.Write that
>> kick
>> > in if sharding control is required. So, for the time being this is a
>> Flink
>> > runner bug and we might have to introduce special overrides as well
>> until
>> > this is solved in the general case.
>> >
>> > Cheers,
>> > Aljoscha
>> >
>> > On Wed, 1 Jun 2016 at 07:37 Jean-Baptiste Onofré <[email protected]>
>> wrote:
>> >>
>> >> Yes, just tested, it happens only with the flink runner.
>> >>
>> >> Agree to create a Jira.
>> >>
>> >> Regards
>> >> JB
>> >>
>> >> On 06/01/2016 03:41 AM, Davor Bonaci wrote:
>> >> > This will be a runner-specific issue. It would be the best to file a
>> >> > JIRA issue for this.
>> >> >
>> >> > On Tue, May 31, 2016 at 9:46 AM, Jean-Baptiste Onofré <
>> [email protected]
>> >> > <mailto:[email protected]>> wrote:
>> >> >
>> >> >     Hi Pawel,
>> >> >
>> >> >     does it happen only with the Flink runner ? I bet it happens with
>> >> >     any runner.
>> >> >
>> >> >     Let me take a look.
>> >> >
>> >> >     Regards
>> >> >     JB
>> >> >
>> >> >     On 05/30/2016 01:38 AM, Pawel Szczur wrote:
>> >> >
>> >> >         Hi,
>> >> >
>> >> >         I'm running a pipeline with Flink backend, Beam bleeding
>> edge,
>> >> >         Oracle
>> >> >         Java 1.8, maven 3.3.3, linux64.
>> >> >
>> >> >         The pipeline is run with --parallelism=6.
>> >> >
>> >> >         Adding .withoutSharding()causes a TextIO sink to write only
>> one
>> >> >         of the
>> >> >         shards.
>> >> >
>> >> >         Example use:
>> >> >
>> >> >
>> data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats"));
>> >> >         vs.
>> >> >
>> >> >
>> data.apply(TextIO.Write.named("write-debug-csv").to("/tmp/some-stats")*.withoutSharding()*);
>> >> >
>> >> >         Result:
>> >> >         Only part of data is written to file. After comparing to
>> sharded
>> >> >         output,
>> >> >         it seems to be just one of shard files.
>> >> >
>> >> >         Cheers,
>> >> >         Pawel
>> >> >
>> >> >
>> >> >     --
>> >> >     Jean-Baptiste Onofré
>> >> >     [email protected] <mailto:[email protected]>
>> >> >     http://blog.nanthrax.net
>> >> >     Talend - http://www.talend.com
>> >> >
>> >> >
>> >>
>> >> --
>> >> Jean-Baptiste Onofré
>> >> [email protected]
>> >> http://blog.nanthrax.net
>> >> Talend - http://www.talend.com
>>
>

Reply via email to