Hi Jozef,

This does not look like a FlinkRunner related problem, but is caused by the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle which apparently does not lead to good data spread in your case.

Do you see the same behavior without `withNumShards(5)`?

Thanks,
Max

On 22.10.18 11:57, Jozef Vilcek wrote:
Hello,

I am having some trouble to get a balanced write via FileIO. Workers at the shuffle side where data per window fire are written to the filesystem receive unbalanced number of events.

Here is a naive code example:

     val read = KafkaIO.read()
         .withTopic("topic")
         .withBootstrapServers("kafka1:9092")
         .withKeyDeserializer(classOf[ByteArrayDeserializer])
         .withValueDeserializer(classOf[ByteArrayDeserializer])
         .withProcessingTime()

     pipeline
         .apply(read)
        .apply(MapElements.via(new SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {           override def apply(input: KafkaRecord[Array[Byte], Array[Byte]]): String = {
             new String(input.getKV.getValue, "UTF-8")
           }
         }))

.apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
             .triggering(AfterWatermark.pastEndOfWindow()
                 .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
                 .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
                   Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
             .discardingFiredPanes()
             .withAllowedLateness(Duration.standardDays(7)))

         .apply(FileIO.write()
             .via(TextIO.sink())
             .withNaming(new SafeFileNaming(outputPath, ".txt"))
             .withTempDirectory(tempLocation)
             .withNumShards(5))


If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to number of shards), I would expect that each worker will participate on persisting shards and equally, since code uses fixed number of shards (and random shard assign?). But reality is different (see 2 attachements - statistiscs from flink task reading from kafka and task writing to files)

What am I missing? How to achieve balanced writes?

Thanks,
Jozef


Reply via email to