Thanks for clarifying. The output will be grouped by the shard key before 
producing the shard. In case you set the number of shards to 1, all data will 
go to a single TaskManager.

For most uses cases, e.g. windowed writes, the data is already sharded 
naturally due to a GroupByKey operation. It looks like you want to directly 
write out this data? Unfortunately with the FileIO logic this is not possible.

Cheers,
Max

On 10.06.19 23:30, Mike Kaplinskiy wrote:
> At a high level, the pipeline looks something like this:
>
> pipeline
>     .apply("read kafka",
> KafkaIO.readBytes().updateConsumerProperties({"auto.offset.reset":
> "earliest"}))
>     .apply("xform", MapElements.via(...))
>     .apply("window", Window.into(FixedWindows.of(Duration.standardDays(1)))
>        
> .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)))
>         .discardingFiredPanes()
>         .withAllowedLateness(Duration.standardHours(1)))
>     .apply("write",
> FileIO.write().to("s3://...").withNumShards(1).withCompression(Compression.GZIP))
>
> Mike.
>
> Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure your life.
>
>
>
> On Mon, Jun 10, 2019 at 1:47 PM Ankur Goenka <goe...@google.com
> <mailto:goe...@google.com>> wrote:
>
>     Hi Mike, This can be because of the partitioning logic of the data.
>
>     If possible, can you share your pipeline code at a high level.
>
>     On Mon, Jun 10, 2019 at 12:58 PM Mike Kaplinskiy
>     <m...@ladderlife.com <mailto:m...@ladderlife.com>> wrote:
>
>
>         Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to insure
>         your life.
>
>
>
>         On Mon, Jun 10, 2019 at 6:51 AM Maximilian Michels
>         <m...@apache.org <mailto:m...@apache.org>> wrote:
>
>             Hi Mike,
>
>             If you set the number of shards to 1, you should get one
>             shard per window; unless you have "ignore windows" set to true.
>
>
>         Right, that makes sense, and what I expected. The thing that I
>         find a little puzzling is that a single Flink sub-task receives
>         all of the data - I'd expect the actual work to be spread across
>         runners since each window is independent.
>          
>
>
>             > (The way I'm checking this is via the Flink UI)
>
>             I'm curious, how do you check this via the Flink UI?
>
>
>         Attached a screenshot
>          
>
>
>             Cheers,
>             Max
>
>             On 09.06.19 22:33, Mike Kaplinskiy wrote:
>             > Hi everyone,
>             >
>             > I’m using a Kafka source with a lot of watermark skew
>             (i.e. new
>             > partitions were added to the topic over time). The sink is a
>             > FileIO.Write().withNumShards(1) to get ~ 1 file per day &
>             an early
>             > trigger to write at most 40,000 records per file.
>             Unfortunately it looks
>             > like there's only 1 shard trying to write files for all
>             the various days
>             > instead of writing multiple days' files in parallel. (The
>             way I'm
>             > checking this is via the Flink UI). Is there anything I
>             could do here to
>             > parallelize the process? All of this is with the Flink runner.
>             >
>             > Mike.
>             >
>             > Ladder <http://bit.ly/1VRtWfS>. The smart, modern way to
>             insure your life.
>             >
>

Reply via email to