Hi Jozef,
This does not look like a FlinkRunner related problem, but is caused by
the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
which apparently does not lead to good data spread in your case.
Do you see the same behavior without `withNumShards(5)`?
Thanks,
Max
On 22.10.18 11:57, Jozef Vilcek wrote:
Hello,
I am having some trouble to get a balanced write via FileIO. Workers at
the shuffle side where data per window fire are written to the
filesystem receive unbalanced number of events.
Here is a naive code example:
val read = KafkaIO.read()
.withTopic("topic")
.withBootstrapServers("kafka1:9092")
.withKeyDeserializer(classOf[ByteArrayDeserializer])
.withValueDeserializer(classOf[ByteArrayDeserializer])
.withProcessingTime()
pipeline
.apply(read)
.apply(MapElements.via(new
SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
override def apply(input: KafkaRecord[Array[Byte],
Array[Byte]]): String = {
new String(input.getKV.getValue, "UTF-8")
}
}))
.apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(40000))
.withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(7)))
.apply(FileIO.write()
.via(TextIO.sink())
.withNaming(new SafeFileNaming(outputPath, ".txt"))
.withTempDirectory(tempLocation)
.withNumShards(5))
If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
number of shards), I would expect that each worker will participate on
persisting shards and equally, since code uses fixed number of shards
(and random shard assign?). But reality is different (see 2 attachements
- statistiscs from flink task reading from kafka and task writing to files)
What am I missing? How to achieve balanced writes?
Thanks,
Jozef