Watermark UI after checkpoint failure

2021-07-18 Thread Dan Hill
After my dev flink job hits a checkpoint failure (e.g. timeout) and then
has successful checkpoints, the flink job appears to be in a bad state.
E.g. some of the operators that previously had a watermark start showing
"no watermark".  The jobs proceed very slowly.

Is there documentation for this state?  It seems weird to me that operators
would not show watermarks anymore.


Re: Delay data elements in pipeline by X minutes

2021-07-18 Thread Jan Lukavský

Hi Dario,

out of curiosity, could you briefly describe the driving use-case? What 
is the (logical) constraint, that drives the requirement? I'd guess, 
that it could be related to waiting for some (external) condition? Or 
maybe related to late data? I think that there might be better 
approaches, than (unconditionally) delay data in pipeline. On the other 
hand, if that is really the best approach, then adding a random key to 
create a keyed stream should work in all cases, right?


 Jan

On 7/18/21 3:52 PM, Dario Heinisch wrote:


Hey Kiran,

Yeah was thinking of another solution, so I have one posgresql sink & 
one kafka sink.


So I can just process the data in real time and insert them in the DB. 
Then I would just select the latest row where created_at >= NOW() - 
interval '15 minutes' and for any kafka consumer I would just do:


let msg = get_next_kafka_msg();
let diff = created_at + 15min - now();
if diff > 0 {
    sleep(diff)
}
// do something
// 
kafka_commit();

And then run some cron job to delete obsolete rows from the db which 
are not required anymore.


Best regards

Dario

On 18.07.21 15:29, Kiran Japannavar wrote:

Hi Dario,

Did you explore other options? If your use case (apart from delaying 
sink writes) can be solved via spark streaming. Then maybe spark 
streaming with a micro-batch of 15 mins would help.




On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch 
mailto:dario.heini...@gmail.com>> wrote:


Hey there,

Hope all is well!

I would like to delay the time by 15minutes before my data
arrives at my
sinks:

stream()
.map()
[]
.
.print()

I tried implementing my own ProcessFunction where TimeStamper is a
custom Interface:

public abstract class Timestamper {
 public abstract long executedAt();
}

public class DelayedProcessor extends
ProcessFunction {

 private final String stateName;
 private final Class clazz;

 // TODO: Should we do ListState as this is being preferred for
serialization
 //  or should we do Value but this may impact
serialization.
 private ListState state;

 private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

 public DelayedProcessor(String stateName, Class clazz) {
 this.stateName = stateName;
 this.clazz = clazz;
 }

 @Override
 public void open(Configuration parameters) {
 state = getRuntimeContext().getListState(new
ListStateDescriptor<>(stateName, clazz));
 }

 @Override
 public void processElement(T t, Context ctx, Collector
collector) throws Exception {
 this.state.add(t);
ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
TIMEOUT);
 }

 @Override
 public void onTimer(long timestamp, OnTimerContext ctx,
Collector out) throws Exception {
 List list = new ArrayList<>();
 this.state.get().forEach(list::add);

 val now = System.currentTimeMillis();

 list = list.stream().filter(v -> {

 if (v.executedAt() + TIMEOUT <= now) {
 out.collect(v);
 return false;
 }

 return true;

 }).collect(Collectors.toList());

 this.state.update(list);
 }
}

Unfortunately, this can only used on a keyed stream which may not
always
be the case for me.

One possible solution would be to use:

.windowAll(SlidingEventTimeWindows.of(Time.minutes(15),
Time.seconds(1)))

and then always just take the value with the lowest timestamp but
this
seems very bad performance wise and the state would be very large.

Does anyone has a solution for me or can point me in the right
direction?

Best regards,

Dario



退订

2021-07-18 Thread tiankai...@gmail.com
退订


祝:工作顺利,完事如意!


Re: Delay data elements in pipeline by X minutes

2021-07-18 Thread Dario Heinisch

Hey Kiran,

Yeah was thinking of another solution, so I have one posgresql sink & 
one kafka sink.


So I can just process the data in real time and insert them in the DB. 
Then I would just select the latest row where created_at >= NOW() - 
interval '15 minutes' and for any kafka consumer I would just do:


let msg = get_next_kafka_msg();
let diff = created_at + 15min - now();
if diff > 0 {
    sleep(diff)
}
// do something
// 
kafka_commit();

And then run some cron job to delete obsolete rows from the db which are 
not required anymore.


Best regards

Dario

On 18.07.21 15:29, Kiran Japannavar wrote:

Hi Dario,

Did you explore other options? If your use case (apart from delaying 
sink writes) can be solved via spark streaming. Then maybe spark 
streaming with a micro-batch of 15 mins would help.




On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch 
mailto:dario.heini...@gmail.com>> wrote:


Hey there,

Hope all is well!

I would like to delay the time by 15minutes before my data arrives
at my
sinks:

stream()
.map()
[]
.
.print()

I tried implementing my own ProcessFunction where TimeStamper is a
custom Interface:

public abstract class Timestamper {
 public abstract long executedAt();
}

public class DelayedProcessor extends
ProcessFunction {

 private final String stateName;
 private final Class clazz;

 // TODO: Should we do ListState as this is being preferred for
serialization
 //  or should we do Value but this may impact
serialization.
 private ListState state;

 private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);

 public DelayedProcessor(String stateName, Class clazz) {
 this.stateName = stateName;
 this.clazz = clazz;
 }

 @Override
 public void open(Configuration parameters) {
 state = getRuntimeContext().getListState(new
ListStateDescriptor<>(stateName, clazz));
 }

 @Override
 public void processElement(T t, Context ctx, Collector
collector) throws Exception {
 this.state.add(t);
ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
TIMEOUT);
 }

 @Override
 public void onTimer(long timestamp, OnTimerContext ctx,
Collector out) throws Exception {
 List list = new ArrayList<>();
 this.state.get().forEach(list::add);

 val now = System.currentTimeMillis();

 list = list.stream().filter(v -> {

 if (v.executedAt() + TIMEOUT <= now) {
 out.collect(v);
 return false;
 }

 return true;

 }).collect(Collectors.toList());

 this.state.update(list);
 }
}

Unfortunately, this can only used on a keyed stream which may not
always
be the case for me.

One possible solution would be to use:

.windowAll(SlidingEventTimeWindows.of(Time.minutes(15),
Time.seconds(1)))

and then always just take the value with the lowest timestamp but
this
seems very bad performance wise and the state would be very large.

Does anyone has a solution for me or can point me in the right
direction?

Best regards,

Dario



Re: Delay data elements in pipeline by X minutes

2021-07-18 Thread Kiran Japannavar
Hi Dario,

Did you explore other options? If your use case (apart from delaying sink
writes) can be solved via spark streaming. Then maybe spark streaming with
a micro-batch of 15 mins would help.



On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch 
wrote:

> Hey there,
>
> Hope all is well!
>
> I would like to delay the time by 15minutes before my data arrives at my
> sinks:
>
> stream()
> .map()
> []
> .
> .print()
>
> I tried implementing my own ProcessFunction where TimeStamper is a
> custom Interface:
>
> public abstract class Timestamper {
>  public abstract long executedAt();
> }
>
> public class DelayedProcessor extends
> ProcessFunction {
>
>  private final String stateName;
>  private final Class clazz;
>
>  // TODO: Should we do ListState as this is being preferred for
> serialization
>  //  or should we do Value but this may impact serialization.
>  private ListState state;
>
>  private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);
>
>  public DelayedProcessor(String stateName, Class clazz) {
>  this.stateName = stateName;
>  this.clazz = clazz;
>  }
>
>  @Override
>  public void open(Configuration parameters) {
>  state = getRuntimeContext().getListState(new
> ListStateDescriptor<>(stateName, clazz));
>  }
>
>  @Override
>  public void processElement(T t, Context ctx, Collector
> collector) throws Exception {
>  this.state.add(t);
>  ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
> TIMEOUT);
>  }
>
>  @Override
>  public void onTimer(long timestamp, OnTimerContext ctx,
> Collector out) throws Exception {
>  List list = new ArrayList<>();
>  this.state.get().forEach(list::add);
>
>  val now = System.currentTimeMillis();
>
>  list = list.stream().filter(v -> {
>
>  if (v.executedAt() + TIMEOUT <= now) {
>  out.collect(v);
>  return false;
>  }
>
>  return true;
>
>  }).collect(Collectors.toList());
>
>  this.state.update(list);
>  }
> }
>
> Unfortunately, this can only used on a keyed stream which may not always
> be the case for me.
>
> One possible solution would be to use:
>
> .windowAll(SlidingEventTimeWindows.of(Time.minutes(15), Time.seconds(1)))
>
> and then always just take the value with the lowest timestamp but this
> seems very bad performance wise and the state would be very large.
>
> Does anyone has a solution for me or can point me in the right direction?
>
> Best regards,
>
> Dario
>
>