Hi,
sorry for the inconvenience! I'm sure we can find a solution together.
Why do you need to keep state in the Watermark Assigner? The Kafka
source will by itself maintain the watermark per partition, so just
specifying a WatermarkStrategy will already correctly compute the
watermark per partition and then combine them together.
Best,
Aljoscha
On 20.08.20 08:08, Till Rohrmann wrote:
Hi Theo,
thanks for reaching out to the community. I am pulling in Aljoscha and Klou
who have worked on the new WatermarkStrategy/WatermarkGenerator abstraction
and might be able to help you with your problem. At the moment, it looks to
me that there is no way to combine state with the new WatermarkGenerator
abstraction.
Cheers,
Till
On Wed, Aug 19, 2020 at 3:37 PM Theo Diefenthal <
theo.diefent...@scoop-software.de> wrote:
Hi there,
Right now I'm in the process of upgrading our Flink 1.9 jobs to Flink 1.11.
In Flink 1.9, I was able to write a AssignerWithperiodicWatermarks which
also extended AbstractRichFunction and could thus utilize State and
getRuntimeContext() in there. This worked as the
TimestampsAndWatermarksOperator was a AbstractUdfStreamOperator and passed
my assigner in as the userFunction to that operator.
I used this feature for some "per partition processing" which Flinks
somehow isn't ideally suited for at the moment I guess. We have ascending
watermarks per kafka partition and do some processing on that. In order to
maintain state per kafka-partition, I now keyby kafkapartition in our
stream (not ideal but better than operatorstate in terms of rescaling) but
afterwards need to emulate the watermark strategy from the initial kafka
source, i.e. reassign watermarks the same way as the kafka source did (per
kafka partition within the operator). Via getRuntimeContext() I am/was able
to identify the kafkaPartitions one operatorinstance was responsible for
and could produce the outputwatermark accordingly. (min over all
responsible partitions).
In Flink 1.11, how can I rebuild this behavior? Do I really need to build
my own TimestampsAndWatermarksOperator which works like the old one? Or is
there a better approach?
Best regards
Theo