Hello,

I’m writing a Flink job that reads heterogenius (one row contains several
types that need to be partitioned downstream) data from AWS Kinesis and
writes to S3 directory structure like s3://bucket/year/month/day/hour/type,
this all works great with StreamingFileSink in Flink 1.9, but problem is
that I need to immedietely (or “as soon as possible” rather) let know
another application to know when “hour” bucket has rolled (i.e. we’re 100%
sure it won’t write any more data for this hour). Another problem is that
data can be very skewed in types, e.g. one hour can contain 90% of rows
with typeA, 30% of rows with typeB and 1% of rows with typeC.

My current plan is to:

1. Split the stream in windows using TumblingProcessingTimeWindows (I don’t
care about event time at all)
2. Assign every row its bucket in a windowing function
3. Write a stateful BucketAssigner that:
3.1. Keeps its last window in a mutable variable
3.2. Once we received a row with newer window sends a message to SQS and
increments the window

My biggest concern now is about 3rd point. For me BucketAssigner looks like
a pure function of (Row, Time) -> Bucket and I’m not sure that introducing
state and side-effect there would be reasonable. Is there any other ways to
do it? I’m also thinking on how I should couple this with checkpointing
mechanism as ideally I’d like to not invoke this callback before checkpoint
is written.

StreamingFileSink provides not much ways to extend it. I tried to
re-implement it for my purposes, but stumbled upon many private methods and
classes, so even though it looks possible, the end result probably will be
too ugly.

To make things a little bit easier, I don’t care too much about delivery
semantics of those final SQS messages - if I get only ~99% of them - that’s
fine, if some of them will be duplicated - that’s also fine.

Regards,
Anton

Reply via email to