Hi KristoffSC,

Firstly, IMO, you can implement this feature by customizing the
`SessionWindowTimeGapExtractor`.

Additionally, let me clearify a concept. A component that implements the
`SessionWindowTimeGapExtractor` interface should not be an operator in
Flink.

In Flink's concepts, Window is an operator, it contains several components:
assigner, trigger, evictors and so on.[1]

>From Flink's codebase, I did not find a specific implementation of this
interface. And it may not access the Flink's state. However, you can still
customize this interface and got the new dynamic gap value via accessing
the third-party systems (Kafka, Redis, ZooKeeper...). For each element,
when assigning a session window for it, Flink always
invoke `SessionWindowTimeGapExtractor#extract()` method. So it makes sense.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html

KristoffSC <krzysiek.chmielew...@gmail.com> 于2020年1月2日周四 下午7:37写道:

> Hi all,
> I'm exploring Flink for our new project.
>
> Currently I'm playing with Session Windows with dynamic Gap. In short, I
> would like to be able to change the value of the gap on demand, for example
> on config update.
>
> So I'm having this code:
>
>
> messageStream
>                 .keyBy(tradeKeySelector)
>                 .window(ProcessingTimeSessionWindows.withDynamicGap(new
>                   SessionWindowTimeGapExtractor<EnrichedMessage>() {
>                     @Override
>                     public long extract(EnrichedMessage element) {
>                        * // Try to dynamically change the gap here
>                         // milliseconds.
>                         return 5000;*
>                     }
>                 }))
>                 .process(new CumulativeTransactionOperator())
>                 .name("Aggregate Transaction Builder");
>
> I would assume something like "broadcast pattern" here, although this is
> related to operators and we are interested with
> SessionWindowTimeGapExtractor here.
>
> Probably we will keep the gap size in a Flink State, not sure if it has to
> be keyed state or "operator state". Updates will come from external
> system.
>
> So I guess, what i need here is actually an operator that will implements
> SessionWindowTimeGapExtractor interface. Instance of this operator will
> keep/update the state based on Config updates and returns the gap size like
> SessionWindowTimeGapExtractor.
>
> Would it be a valid approach for this use case? Is it any other way to have
> such a config in Flink state?
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to