I don't want to speak for Apache Flink - I'm using it via Apache Beam only - but generally speaking, each key will have to be held in state up to some moment when it can be garbage collected. This moment is defined (at least in the Apache Beam case) as the timestamp of end of window + allowed lateness. So, in the case of global window, it is (practically) forever in future, yes.

You can clean the state manually, though. If you would use the UUID (or similar) approach, then you would set a timer for the 15 minutes (relative) interval and then after you emit the data, you can clear the timer and the value state, which should clear the complete state of the window (please someone correct me if I'm wrong).

Alternative approach would be to use session windows and a GroupByKey-like operation, which would hold and emit element at the end of the session, which is exactly what you need. The state of the session window will be cleared in this case as well.

 Jan

On 7/19/21 2:00 PM, Dario Heinisch wrote:

Hey Jan,

No it isn't a logical constraint. Reason is there are different kind of users, some who pay for live data while other want a cheaper version but where the data is delayed.

But what happens if I add a random key ( lets say a uuid ) isn't that bad for performance? Then for every Object that is being processed I would have a state which is only being used once but I assume Flink wouldn't clean that state up, wouldn't it? What happens to the ValueState? Is that still being kept in memory? Because I thought that for every key Flink encounters it would keep a state.

But I think this could be solved with a TTL: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl, guess I will test that at some point this week! :)

For reference, this would be the code:

[...]
.keyBy(t -> UUID.randomUUID())
.process(new DelayedProcessor<>(NAME, CLAZZ))

public abstract class Timestamper { public abstract long executedAt(); }

public class DelayedProcessor<T extends Timestamper> extends KeyedProcessFunction<UUID, T, T> implements ResultTypeQueryable<T> {

    private final String stateName;
    private final Class<T> clazz;

    private ValueState<T> state;

    private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

    public DelayedProcessor(String stateName, Class<T> clazz) {
        this.stateName = stateName;
        this.clazz = clazz;
    }

    @Override
    public void open(Configuration parameters) {

        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(15))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
                .build();

        ValueStateDescriptor<T> desc = new ValueStateDescriptor<>(stateName, clazz);
        desc.enableTimeToLive(ttlConfig);
        state = getRuntimeContext().getState(desc);
    }

    @Override
    public void processElement(T t, Context ctx, Collector<T> collector) throws Exception {
        this.state.update(t);

        long now = System.currentTimeMillis();

        long timeout = (now + TIMEOUT) - t.executedAt();

        ctx.timerService().registerEventTimeTimer(timeout);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<T> out) throws Exception {
        out.collect(this.state.value());
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeInformation.of(clazz);
    }
}


Best regards,

Dario



On 18.07.21 19:12, Jan Lukavský wrote:

Hi Dario,

out of curiosity, could you briefly describe the driving use-case? What is the (logical) constraint, that drives the requirement? I'd guess, that it could be related to waiting for some (external) condition? Or maybe related to late data? I think that there might be better approaches, than (unconditionally) delay data in pipeline. On the other hand, if that is really the best approach, then adding a random key to create a keyed stream should work in all cases, right?

 Jan

On 7/18/21 3:52 PM, Dario Heinisch wrote:

Hey Kiran,

Yeah was thinking of another solution, so I have one posgresql sink & one kafka sink.

So I can just process the data in real time and insert them in the DB. Then I would just select the latest row where created_at >= NOW() - interval '15 minutes' and for any kafka consumer I would just do:

let msg = get_next_kafka_msg();
let diff = created_at + 15min - now();
if diff > 0 {
    sleep(diff)
}
// do something
// ....
kafka_commit();

And then run some cron job to delete obsolete rows from the db which are not required anymore.

Best regards

Dario

On 18.07.21 15:29, Kiran Japannavar wrote:
Hi Dario,

Did you explore other options? If your use case (apart from delaying sink writes) can be solved via spark streaming. Then maybe spark streaming with a micro-batch of 15 mins would help.



On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch <dario.heini...@gmail.com <mailto:dario.heini...@gmail.com>> wrote:

    Hey there,

    Hope all is well!

    I would like to delay the time by 15minutes before my data
    arrives at my
    sinks:

    stream()
    .map()
    [....]
    .<DELAY_DATA_FOR_15_MINUTES>
    .print()

    I tried implementing my own ProcessFunction where TimeStamper is a
    custom Interface:

    public abstract class Timestamper {
         public abstract long executedAt();
    }

    public class DelayedProcessor<T extends Timestamper> extends
    ProcessFunction<T, T> {

         private final String stateName;
         private final Class<T> clazz;

         // TODO: Should we do ListState as this is being preferred
    for
    serialization
         //  or should we do Value<Queue> but this may impact
    serialization.
         private ListState<T> state;

         private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

         public DelayedProcessor(String stateName, Class<T> clazz) {
             this.stateName = stateName;
             this.clazz = clazz;
         }

         @Override
         public void open(Configuration parameters) {
             state = getRuntimeContext().getListState(new
    ListStateDescriptor<>(stateName, clazz));
         }

         @Override
         public void processElement(T t, Context ctx, Collector<T>
    collector) throws Exception {
             this.state.add(t);
    ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
    TIMEOUT);
         }

         @Override
         public void onTimer(long timestamp, OnTimerContext ctx,
    Collector<T> out) throws Exception {
             List<T> list = new ArrayList<>();
             this.state.get().forEach(list::add);

             val now = System.currentTimeMillis();

             list = list.stream().filter(v -> {

                 if (v.executedAt() + TIMEOUT <= now) {
                     out.collect(v);
                     return false;
                 }

                 return true;

             }).collect(Collectors.toList());

             this.state.update(list);
         }
    }

    Unfortunately, this can only used on a keyed stream which may
    not always
    be the case for me.

    One possible solution would be to use:

    .windowAll(SlidingEventTimeWindows.of(Time.minutes(15),
    Time.seconds(1)))

    and then always just take the value with the lowest timestamp
    but this
    seems very bad performance wise and the state would be very large.

    Does anyone has a solution for me or can point me in the right
    direction?

    Best regards,

    Dario

Reply via email to