Hey Jan,

No it isn't a logical constraint. Reason is there are different kind of users, some who pay for live data while other want a cheaper version but where the data is delayed.

But what happens if I add a random key ( lets say a uuid ) isn't that bad for performance? Then for every Object that is being processed I would have a state which is only being used once but I assume Flink wouldn't clean that state up, wouldn't it? What happens to the ValueState? Is that still being kept in memory? Because I thought that for every key Flink encounters it would keep a state.

But I think this could be solved with a TTL: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl, guess I will test that at some point this week! :)

For reference, this would be the code:

[...]
.keyBy(t -> UUID.randomUUID())
.process(new DelayedProcessor<>(NAME, CLAZZ))

public abstract class Timestamper { public abstract long executedAt(); }

public class DelayedProcessor<T extends Timestamper> extends KeyedProcessFunction<UUID, T, T> implements ResultTypeQueryable<T> {

    private final String stateName;
    private final Class<T> clazz;

    private ValueState<T> state;

    private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

    public DelayedProcessor(String stateName, Class<T> clazz) {
        this.stateName = stateName;
        this.clazz = clazz;
    }

    @Override
    public void open(Configuration parameters) {

        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(15))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
                .build();

        ValueStateDescriptor<T> desc = new ValueStateDescriptor<>(stateName, clazz);
        desc.enableTimeToLive(ttlConfig);
        state = getRuntimeContext().getState(desc);
    }

    @Override
    public void processElement(T t, Context ctx, Collector<T> collector) throws Exception {
        this.state.update(t);

        long now = System.currentTimeMillis();

        long timeout = (now + TIMEOUT) - t.executedAt();

        ctx.timerService().registerEventTimeTimer(timeout);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<T> out) throws Exception {
        out.collect(this.state.value());
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeInformation.of(clazz);
    }
}


Best regards,

Dario



On 18.07.21 19:12, Jan Lukavský wrote:

Hi Dario,

out of curiosity, could you briefly describe the driving use-case? What is the (logical) constraint, that drives the requirement? I'd guess, that it could be related to waiting for some (external) condition? Or maybe related to late data? I think that there might be better approaches, than (unconditionally) delay data in pipeline. On the other hand, if that is really the best approach, then adding a random key to create a keyed stream should work in all cases, right?

 Jan

On 7/18/21 3:52 PM, Dario Heinisch wrote:

Hey Kiran,

Yeah was thinking of another solution, so I have one posgresql sink & one kafka sink.

So I can just process the data in real time and insert them in the DB. Then I would just select the latest row where created_at >= NOW() - interval '15 minutes' and for any kafka consumer I would just do:

let msg = get_next_kafka_msg();
let diff = created_at + 15min - now();
if diff > 0 {
    sleep(diff)
}
// do something
// ....
kafka_commit();

And then run some cron job to delete obsolete rows from the db which are not required anymore.

Best regards

Dario

On 18.07.21 15:29, Kiran Japannavar wrote:
Hi Dario,

Did you explore other options? If your use case (apart from delaying sink writes) can be solved via spark streaming. Then maybe spark streaming with a micro-batch of 15 mins would help.



On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch <dario.heini...@gmail.com <mailto:dario.heini...@gmail.com>> wrote:

    Hey there,

    Hope all is well!

    I would like to delay the time by 15minutes before my data
    arrives at my
    sinks:

    stream()
    .map()
    [....]
    .<DELAY_DATA_FOR_15_MINUTES>
    .print()

    I tried implementing my own ProcessFunction where TimeStamper is a
    custom Interface:

    public abstract class Timestamper {
         public abstract long executedAt();
    }

    public class DelayedProcessor<T extends Timestamper> extends
    ProcessFunction<T, T> {

         private final String stateName;
         private final Class<T> clazz;

         // TODO: Should we do ListState as this is being preferred for
    serialization
         //  or should we do Value<Queue> but this may impact
    serialization.
         private ListState<T> state;

         private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

         public DelayedProcessor(String stateName, Class<T> clazz) {
             this.stateName = stateName;
             this.clazz = clazz;
         }

         @Override
         public void open(Configuration parameters) {
             state = getRuntimeContext().getListState(new
    ListStateDescriptor<>(stateName, clazz));
         }

         @Override
         public void processElement(T t, Context ctx, Collector<T>
    collector) throws Exception {
             this.state.add(t);
    ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
    TIMEOUT);
         }

         @Override
         public void onTimer(long timestamp, OnTimerContext ctx,
    Collector<T> out) throws Exception {
             List<T> list = new ArrayList<>();
             this.state.get().forEach(list::add);

             val now = System.currentTimeMillis();

             list = list.stream().filter(v -> {

                 if (v.executedAt() + TIMEOUT <= now) {
                     out.collect(v);
                     return false;
                 }

                 return true;

             }).collect(Collectors.toList());

             this.state.update(list);
         }
    }

    Unfortunately, this can only used on a keyed stream which may
    not always
    be the case for me.

    One possible solution would be to use:

    .windowAll(SlidingEventTimeWindows.of(Time.minutes(15),
    Time.seconds(1)))

    and then always just take the value with the lowest timestamp
    but this
    seems very bad performance wise and the state would be very large.

    Does anyone has a solution for me or can point me in the right
    direction?

    Best regards,

    Dario

Reply via email to