I think it's worth adding a URN for the operation of distributing "evenly" into an "appropriate" number of shards. A naive implementation would add random keys and to a ReshufflePerKey, but runners could override this to do a reshuffle and then key by whatever notion of bundle/worker/shard identifier they have that lines up with the number of actual workers.
On Fri, Oct 26, 2018 at 11:34 AM Jozef Vilcek <[email protected]> wrote: > Thanks for the JIRA. If I understand it correctly ... so runner determined > sharding will avoid extra shuffle? Will it just write worker local > available data to it's shard? Something similar to coalesce in Spark? > > On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels <[email protected]> > wrote: > >> Oh ok, thanks for the pointer. Coming from Flink, the default is that >> the sharding is determined by the runtime distribution. Indeed, we will >> have to add an overwrite to the Flink Runner, similar to this one: >> >> >> https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347 >> >> Jira issue: https://issues.apache.org/jira/browse/BEAM-5865 >> >> Thanks, >> Max >> >> On 25.10.18 22:37, Reuven Lax wrote: >> > FYI the Dataflow runner automatically sets the default number of shards >> > (I believe to be 2 * num_workers). Probably we should do something >> > similar for the Flink runner. >> > >> > This needs to be done by the runner, as # of workers is a runner >> > concept; the SDK itself has no concept of workers. >> > >> > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > If I do not specify shards for unbounded collection, I get >> > >> > Caused by: java.lang.IllegalArgumentException: When applying >> > WriteFiles to an unbounded PCollection, must specify number of >> > output shards explicitly >> > at >> > org.apache.beam.repackaged.beam_sdks_java_core.com >> .google.common.base.Preconditions.checkArgument(Preconditions.java:191) >> > at >> > org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289) >> > >> > Around same lines in WriteFiles is also a check for windowed writes. >> > I believe FileIO enables it explicitly when windowing is present. In >> > filesystem written files are per window and shard. >> > >> > On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > I agree it would be nice to keep the current distribution of >> > elements >> > instead of doing a shuffle based on an artificial shard key. >> > >> > Have you tried `withWindowedWrites()`? Also, why do you say you >> > need to >> > specify the number of shards in streaming mode? >> > >> > -Max >> > >> > On 25.10.18 10:12, Jozef Vilcek wrote: >> > > Hm, yes, this makes sense now, but what can be done for my >> > case? I do >> > > not want to end up with too many files on disk. >> > > >> > > I think what I am looking for is to instruct IO that do not >> > do again >> > > random shard and reshuffle but just assume number of shards >> > equal to >> > > number of workers and shard ID is a worker ID. >> > > Is this doable in beam model? >> > > >> > > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels >> > <[email protected] <mailto:[email protected]> >> > > <mailto:[email protected] <mailto:[email protected]>>> wrote: >> > > >> > > The FlinkRunner uses a hash function (MurmurHash) on each >> > key which >> > > places keys somewhere in the hash space. The hash space >> > (2^32) is split >> > > among the partitions (5 in your case). Given enough keys, >> > the chance >> > > increases they are equally spread. >> > > >> > > This should be similar to what the other Runners do. >> > > >> > > On 24.10.18 10:58, Jozef Vilcek wrote: >> > > > >> > > > So if I run 5 workers with 50 shards, I end up with: >> > > > >> > > > DurationBytes receivedRecords received >> > > > 2m 39s 900 MB 465,525 >> > > > 2m 39s 1.76 GB 930,720 >> > > > 2m 39s 789 MB 407,315 >> > > > 2m 39s 1.32 GB 698,262 >> > > > 2m 39s 788 MB 407,310 >> > > > >> > > > Still not good but better than with 5 shards where >> > some workers >> > > did not >> > > > participate at all. >> > > > So, problem is in some layer which distributes keys / >> > shards >> > > among workers? >> > > > >> > > > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax >> > <[email protected] <mailto:[email protected]> >> > > <mailto:[email protected] <mailto:[email protected]>> >> > > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >> > > > >> > > > withNumShards(5) generates 5 random shards. It >> > turns out that >> > > > statistically when you generate 5 random shards >> > and you have 5 >> > > > works, the probability is reasonably high that >> > some workers >> > > will get >> > > > more than one shard (and as a result not all >> > workers will >> > > > participate). Are you able to set the number of >> > shards larger >> > > than 5? >> > > > >> > > > On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek >> > > <[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>> >> > > > <mailto:[email protected] >> > <mailto:[email protected]> >> > > <mailto:[email protected] >> > <mailto:[email protected]>>>> wrote: >> > > > >> > > > cc (dev) >> > > > >> > > > I tried to run the example with FlinkRunner in >> > batch mode and >> > > > received again bad data spread among the >> workers. >> > > > >> > > > When I tried to remove number of shards for >> > batch mode in >> > > above >> > > > example, pipeline crashed before launch >> > > > >> > > > Caused by: java.lang.IllegalStateException: >> > Inputs to Flatten >> > > > had incompatible triggers: >> > > > >> > > >> > >> >> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem >> > > > entCountAtLeast(10000)), >> > > > >> > > >> > >> >> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 >> > > > hour)))), >> > > > >> > > >> > >> >> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo >> > > > rever(AfterPane.elementCountAtLeast(1)), >> > > > >> > > >> > >> >> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()))) >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek >> > > > <[email protected] >> > <mailto:[email protected]> <mailto:[email protected] >> > <mailto:[email protected]>> >> > > <mailto:[email protected] >> > <mailto:[email protected]> <mailto:[email protected] >> > <mailto:[email protected]>>>> wrote: >> > > > >> > > > Hi Max, >> > > > >> > > > I forgot to mention that example is run in >> > streaming >> > > mode, >> > > > therefore I can not do writes without >> > specifying shards. >> > > > FileIO explicitly asks for them. >> > > > >> > > > I am not sure where the problem is. >> > FlinkRunner is >> > > only one >> > > > I used. >> > > > >> > > > On Tue, Oct 23, 2018 at 11:27 AM >> > Maximilian Michels >> > > > <[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>> >> > > <mailto:[email protected] <mailto:[email protected]> >> > <mailto:[email protected] <mailto:[email protected]>>>> wrote: >> > > > >> > > > Hi Jozef, >> > > > >> > > > This does not look like a FlinkRunner >> > related >> > > problem, >> > > > but is caused by >> > > > the `WriteFiles` sharding logic. It >> > assigns keys and >> > > > does a Reshuffle >> > > > which apparently does not lead to good >> > data spread in >> > > > your case. >> > > > >> > > > Do you see the same behavior without >> > > `withNumShards(5)`? >> > > > >> > > > Thanks, >> > > > Max >> > > > >> > > > On 22.10.18 11:57, Jozef Vilcek wrote: >> > > > > Hello, >> > > > > >> > > > > I am having some trouble to get a >> > balanced >> > > write via >> > > > FileIO. Workers at >> > > > > the shuffle side where data per >> > window fire are >> > > > written to the >> > > > > filesystem receive unbalanced >> > number of events. >> > > > > >> > > > > Here is a naive code example: >> > > > > >> > > > > val read = KafkaIO.read() >> > > > > .withTopic("topic") >> > > > > >> > .withBootstrapServers("kafka1:9092") >> > > > > >> > > > >> > .withKeyDeserializer(classOf[ByteArrayDeserializer]) >> > > > > >> > > > >> > > .withValueDeserializer(classOf[ByteArrayDeserializer]) >> > > > > .withProcessingTime() >> > > > > >> > > > > pipeline >> > > > > .apply(read) >> > > > > .apply(MapElements.via(new >> > > > > >> SimpleFunction[KafkaRecord[Array[Byte], >> > > Array[Byte]], >> > > > String]() { >> > > > > override def >> apply(input: >> > > > KafkaRecord[Array[Byte], >> > > > > Array[Byte]]): String = { >> > > > > new >> > String(input.getKV.getValue, >> > > "UTF-8") >> > > > > } >> > > > > })) >> > > > > >> > > > > >> > > > > >> > > > >> > > >> > >> .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1))) >> > > > > >> > > .triggering(AfterWatermark.pastEndOfWindow() >> > > > > >> > > > >> > > .withEarlyFirings(AfterPane.elementCountAtLeast(40000)) >> > > > > >> > > > >> > > >> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger]( >> > > > > >> > > > >> > > >> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)), >> > > > > >> > > > > >> > > > >> > > >> > >> >> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1))))))) >> > > > > >> .discardingFiredPanes() >> > > > > >> > > > >> > .withAllowedLateness(Duration.standardDays(7))) >> > > > > >> > > > > .apply(FileIO.write() >> > > > > .via(TextIO.sink()) >> > > > > .withNaming(new >> > > > SafeFileNaming(outputPath, ".txt")) >> > > > > >> > .withTempDirectory(tempLocation) >> > > > > .withNumShards(5)) >> > > > > >> > > > > >> > > > > If I run this on Beam 2.6.0 with >> > Flink 1.5.0 on 5 >> > > > workers (equal to >> > > > > number of shards), I would expect >> > that each worker >> > > > will participate on >> > > > > persisting shards and equally, >> > since code uses >> > > fixed >> > > > number of shards >> > > > > (and random shard assign?). But >> > reality is >> > > different >> > > > (see 2 attachements >> > > > > - statistiscs from flink task >> > reading from >> > > kafka and >> > > > task writing to files) >> > > > > >> > > > > What am I missing? How to achieve >> > balanced writes? >> > > > > >> > > > > Thanks, >> > > > > Jozef >> > > > > >> > > > > >> > > > >> > > >> > >> >
