If I do not specify shards for unbounded collection, I get
Caused by: java.lang.IllegalArgumentException: When applying WriteFiles to
an unbounded PCollection, must specify number of output shards explicitly
at
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)
Around same lines in WriteFiles is also a check for windowed writes. I
believe FileIO enables it explicitly when windowing is present. In
filesystem written files are per window and shard.
On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <[email protected]> wrote:
> I agree it would be nice to keep the current distribution of elements
> instead of doing a shuffle based on an artificial shard key.
>
> Have you tried `withWindowedWrites()`? Also, why do you say you need to
> specify the number of shards in streaming mode?
>
> -Max
>
> On 25.10.18 10:12, Jozef Vilcek wrote:
> > Hm, yes, this makes sense now, but what can be done for my case? I do
> > not want to end up with too many files on disk.
> >
> > I think what I am looking for is to instruct IO that do not do again
> > random shard and reshuffle but just assume number of shards equal to
> > number of workers and shard ID is a worker ID.
> > Is this doable in beam model?
> >
> > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels <[email protected]
> > <mailto:[email protected]>> wrote:
> >
> > The FlinkRunner uses a hash function (MurmurHash) on each key which
> > places keys somewhere in the hash space. The hash space (2^32) is
> split
> > among the partitions (5 in your case). Given enough keys, the chance
> > increases they are equally spread.
> >
> > This should be similar to what the other Runners do.
> >
> > On 24.10.18 10:58, Jozef Vilcek wrote:
> > >
> > > So if I run 5 workers with 50 shards, I end up with:
> > >
> > > DurationBytes receivedRecords received
> > > 2m 39s 900 MB 465,525
> > > 2m 39s 1.76 GB 930,720
> > > 2m 39s 789 MB 407,315
> > > 2m 39s 1.32 GB 698,262
> > > 2m 39s 788 MB 407,310
> > >
> > > Still not good but better than with 5 shards where some workers
> > did not
> > > participate at all.
> > > So, problem is in some layer which distributes keys / shards
> > among workers?
> > >
> > > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <[email protected]
> > <mailto:[email protected]>
> > > <mailto:[email protected] <mailto:[email protected]>>> wrote:
> > >
> > > withNumShards(5) generates 5 random shards. It turns out that
> > > statistically when you generate 5 random shards and you have 5
> > > works, the probability is reasonably high that some workers
> > will get
> > > more than one shard (and as a result not all workers will
> > > participate). Are you able to set the number of shards larger
> > than 5?
> > >
> > > On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
> > <[email protected] <mailto:[email protected]>
> > > <mailto:[email protected]
> > <mailto:[email protected]>>> wrote:
> > >
> > > cc (dev)
> > >
> > > I tried to run the example with FlinkRunner in batch mode
> and
> > > received again bad data spread among the workers.
> > >
> > > When I tried to remove number of shards for batch mode in
> > above
> > > example, pipeline crashed before launch
> > >
> > > Caused by: java.lang.IllegalStateException: Inputs to
> Flatten
> > > had incompatible triggers:
> > >
> >
>
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> > > entCountAtLeast(10000)),
> > >
> >
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> > > hour)))),
> > >
> >
>
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> > > rever(AfterPane.elementCountAtLeast(1)),
> > >
> >
>
> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
> > > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>>
> wrote:
> > >
> > > Hi Max,
> > >
> > > I forgot to mention that example is run in streaming
> > mode,
> > > therefore I can not do writes without specifying
> shards.
> > > FileIO explicitly asks for them.
> > >
> > > I am not sure where the problem is. FlinkRunner is
> > only one
> > > I used.
> > >
> > > On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
> > > <[email protected] <mailto:[email protected]>
> > <mailto:[email protected] <mailto:[email protected]>>> wrote:
> > >
> > > Hi Jozef,
> > >
> > > This does not look like a FlinkRunner related
> > problem,
> > > but is caused by
> > > the `WriteFiles` sharding logic. It assigns keys
> and
> > > does a Reshuffle
> > > which apparently does not lead to good data
> spread in
> > > your case.
> > >
> > > Do you see the same behavior without
> > `withNumShards(5)`?
> > >
> > > Thanks,
> > > Max
> > >
> > > On 22.10.18 11:57, Jozef Vilcek wrote:
> > > > Hello,
> > > >
> > > > I am having some trouble to get a balanced
> > write via
> > > FileIO. Workers at
> > > > the shuffle side where data per window fire are
> > > written to the
> > > > filesystem receive unbalanced number of events.
> > > >
> > > > Here is a naive code example:
> > > >
> > > > val read = KafkaIO.read()
> > > > .withTopic("topic")
> > > > .withBootstrapServers("kafka1:9092")
> > > >
> > >
> .withKeyDeserializer(classOf[ByteArrayDeserializer])
> > > >
> > >
> > .withValueDeserializer(classOf[ByteArrayDeserializer])
> > > > .withProcessingTime()
> > > >
> > > > pipeline
> > > > .apply(read)
> > > > .apply(MapElements.via(new
> > > > SimpleFunction[KafkaRecord[Array[Byte],
> > Array[Byte]],
> > > String]() {
> > > > override def apply(input:
> > > KafkaRecord[Array[Byte],
> > > > Array[Byte]]): String = {
> > > > new String(input.getKV.getValue,
> > "UTF-8")
> > > > }
> > > > }))
> > > >
> > > >
> > > >
> > >
> >
> .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> > > >
> > .triggering(AfterWatermark.pastEndOfWindow()
> > > >
> > >
> > .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> > > >
> > >
> > .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> > > >
> > >
> > Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> > > >
> > > >
> > >
> >
>
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> > > > .discardingFiredPanes()
> > > >
> > > .withAllowedLateness(Duration.standardDays(7)))
> > > >
> > > > .apply(FileIO.write()
> > > > .via(TextIO.sink())
> > > > .withNaming(new
> > > SafeFileNaming(outputPath, ".txt"))
> > > > .withTempDirectory(tempLocation)
> > > > .withNumShards(5))
> > > >
> > > >
> > > > If I run this on Beam 2.6.0 with Flink 1.5.0
> on 5
> > > workers (equal to
> > > > number of shards), I would expect that each
> worker
> > > will participate on
> > > > persisting shards and equally, since code uses
> > fixed
> > > number of shards
> > > > (and random shard assign?). But reality is
> > different
> > > (see 2 attachements
> > > > - statistiscs from flink task reading from
> > kafka and
> > > task writing to files)
> > > >
> > > > What am I missing? How to achieve balanced
> writes?
> > > >
> > > > Thanks,
> > > > Jozef
> > > >
> > > >
> > >
> >
>