Watermark UI after checkpoint failure
After my dev flink job hits a checkpoint failure (e.g. timeout) and then has successful checkpoints, the flink job appears to be in a bad state. E.g. some of the operators that previously had a watermark start showing "no watermark". The jobs proceed very slowly. Is there documentation for this state? It seems weird to me that operators would not show watermarks anymore.
Re: Delay data elements in pipeline by X minutes
Hi Dario, out of curiosity, could you briefly describe the driving use-case? What is the (logical) constraint, that drives the requirement? I'd guess, that it could be related to waiting for some (external) condition? Or maybe related to late data? I think that there might be better approaches, than (unconditionally) delay data in pipeline. On the other hand, if that is really the best approach, then adding a random key to create a keyed stream should work in all cases, right? Jan On 7/18/21 3:52 PM, Dario Heinisch wrote: Hey Kiran, Yeah was thinking of another solution, so I have one posgresql sink & one kafka sink. So I can just process the data in real time and insert them in the DB. Then I would just select the latest row where created_at >= NOW() - interval '15 minutes' and for any kafka consumer I would just do: let msg = get_next_kafka_msg(); let diff = created_at + 15min - now(); if diff > 0 { sleep(diff) } // do something // kafka_commit(); And then run some cron job to delete obsolete rows from the db which are not required anymore. Best regards Dario On 18.07.21 15:29, Kiran Japannavar wrote: Hi Dario, Did you explore other options? If your use case (apart from delaying sink writes) can be solved via spark streaming. Then maybe spark streaming with a micro-batch of 15 mins would help. On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch mailto:dario.heini...@gmail.com>> wrote: Hey there, Hope all is well! I would like to delay the time by 15minutes before my data arrives at my sinks: stream() .map() [] . .print() I tried implementing my own ProcessFunction where TimeStamper is a custom Interface: public abstract class Timestamper { public abstract long executedAt(); } public class DelayedProcessor extends ProcessFunction { private final String stateName; private final Class clazz; // TODO: Should we do ListState as this is being preferred for serialization // or should we do Value but this may impact serialization. private ListState state; private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15); public DelayedProcessor(String stateName, Class clazz) { this.stateName = stateName; this.clazz = clazz; } @Override public void open(Configuration parameters) { state = getRuntimeContext().getListState(new ListStateDescriptor<>(stateName, clazz)); } @Override public void processElement(T t, Context ctx, Collector collector) throws Exception { this.state.add(t); ctx.timerService().registerEventTimeTimer(ctx.timestamp() + TIMEOUT); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { List list = new ArrayList<>(); this.state.get().forEach(list::add); val now = System.currentTimeMillis(); list = list.stream().filter(v -> { if (v.executedAt() + TIMEOUT <= now) { out.collect(v); return false; } return true; }).collect(Collectors.toList()); this.state.update(list); } } Unfortunately, this can only used on a keyed stream which may not always be the case for me. One possible solution would be to use: .windowAll(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(1))) and then always just take the value with the lowest timestamp but this seems very bad performance wise and the state would be very large. Does anyone has a solution for me or can point me in the right direction? Best regards, Dario
退订
退订 祝:工作顺利,完事如意!
Re: Delay data elements in pipeline by X minutes
Hey Kiran, Yeah was thinking of another solution, so I have one posgresql sink & one kafka sink. So I can just process the data in real time and insert them in the DB. Then I would just select the latest row where created_at >= NOW() - interval '15 minutes' and for any kafka consumer I would just do: let msg = get_next_kafka_msg(); let diff = created_at + 15min - now(); if diff > 0 { sleep(diff) } // do something // kafka_commit(); And then run some cron job to delete obsolete rows from the db which are not required anymore. Best regards Dario On 18.07.21 15:29, Kiran Japannavar wrote: Hi Dario, Did you explore other options? If your use case (apart from delaying sink writes) can be solved via spark streaming. Then maybe spark streaming with a micro-batch of 15 mins would help. On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch mailto:dario.heini...@gmail.com>> wrote: Hey there, Hope all is well! I would like to delay the time by 15minutes before my data arrives at my sinks: stream() .map() [] . .print() I tried implementing my own ProcessFunction where TimeStamper is a custom Interface: public abstract class Timestamper { public abstract long executedAt(); } public class DelayedProcessor extends ProcessFunction { private final String stateName; private final Class clazz; // TODO: Should we do ListState as this is being preferred for serialization // or should we do Value but this may impact serialization. private ListState state; private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15); public DelayedProcessor(String stateName, Class clazz) { this.stateName = stateName; this.clazz = clazz; } @Override public void open(Configuration parameters) { state = getRuntimeContext().getListState(new ListStateDescriptor<>(stateName, clazz)); } @Override public void processElement(T t, Context ctx, Collector collector) throws Exception { this.state.add(t); ctx.timerService().registerEventTimeTimer(ctx.timestamp() + TIMEOUT); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { List list = new ArrayList<>(); this.state.get().forEach(list::add); val now = System.currentTimeMillis(); list = list.stream().filter(v -> { if (v.executedAt() + TIMEOUT <= now) { out.collect(v); return false; } return true; }).collect(Collectors.toList()); this.state.update(list); } } Unfortunately, this can only used on a keyed stream which may not always be the case for me. One possible solution would be to use: .windowAll(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(1))) and then always just take the value with the lowest timestamp but this seems very bad performance wise and the state would be very large. Does anyone has a solution for me or can point me in the right direction? Best regards, Dario
Re: Delay data elements in pipeline by X minutes
Hi Dario, Did you explore other options? If your use case (apart from delaying sink writes) can be solved via spark streaming. Then maybe spark streaming with a micro-batch of 15 mins would help. On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch wrote: > Hey there, > > Hope all is well! > > I would like to delay the time by 15minutes before my data arrives at my > sinks: > > stream() > .map() > [] > . > .print() > > I tried implementing my own ProcessFunction where TimeStamper is a > custom Interface: > > public abstract class Timestamper { > public abstract long executedAt(); > } > > public class DelayedProcessor extends > ProcessFunction { > > private final String stateName; > private final Class clazz; > > // TODO: Should we do ListState as this is being preferred for > serialization > // or should we do Value but this may impact serialization. > private ListState state; > > private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15); > > public DelayedProcessor(String stateName, Class clazz) { > this.stateName = stateName; > this.clazz = clazz; > } > > @Override > public void open(Configuration parameters) { > state = getRuntimeContext().getListState(new > ListStateDescriptor<>(stateName, clazz)); > } > > @Override > public void processElement(T t, Context ctx, Collector > collector) throws Exception { > this.state.add(t); > ctx.timerService().registerEventTimeTimer(ctx.timestamp() + > TIMEOUT); > } > > @Override > public void onTimer(long timestamp, OnTimerContext ctx, > Collector out) throws Exception { > List list = new ArrayList<>(); > this.state.get().forEach(list::add); > > val now = System.currentTimeMillis(); > > list = list.stream().filter(v -> { > > if (v.executedAt() + TIMEOUT <= now) { > out.collect(v); > return false; > } > > return true; > > }).collect(Collectors.toList()); > > this.state.update(list); > } > } > > Unfortunately, this can only used on a keyed stream which may not always > be the case for me. > > One possible solution would be to use: > > .windowAll(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(1))) > > and then always just take the value with the lowest timestamp but this > seems very bad performance wise and the state would be very large. > > Does anyone has a solution for me or can point me in the right direction? > > Best regards, > > Dario > >