Thanks Fabian, Kostas,
Here is what I had in the Trigger - idea is to run bitwise OR until a threshold 
is reached or a timeout is reached (nothing too fancy here). Let me know what 
you guys think. Like I said, I moved this logic to Process Function and I 
haven't seen the same issue I was with this. 

@PublicEvolvingpublic class BitwiseOrTrigger<W extends Window> extends 
Trigger<FactoredEvent, W> { private static final long serialVersionUID = 1L; 
private final int threshold; private final long epocDelta; private final 
ReducingStateDescriptor<Tuple2<Integer, Long>> stateDesc =  new 
ReducingStateDescriptor<>("bitwiseOr", new BitwiseOr(), TypeInformation.of(new 
TypeHint<Tuple2<Integer,Long>>() {}));

 private BitwiseOrTrigger(int threshold, long allowedLateness) { this.threshold 
= threshold; this.epocDelta = allowedLateness; }
 @Override public TriggerResult onElement(FactoredEvent event, long timestamp, 
W window, TriggerContext ctx) throws Exception { 
ReducingState<Tuple2<Integer,Long>> currState = 
ctx.getPartitionedState(stateDesc); if (this.epocDelta>0) { 
ctx.registerProcessingTimeTimer(System.currentTimeMillis() + this.epocDelta); } 
currState.add(new Tuple2<Integer,Long>(event.getFactor(), this.epocDelta)); if 
(currState.get().f0 >= threshold) { currState.clear(); return 
TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; }
 @Override public TriggerResult onEventTime(long time, W window, TriggerContext 
ctx) { return TriggerResult.FIRE_AND_PURGE; }
 @Override public TriggerResult onProcessingTime(long time, W window, 
TriggerContext ctx) throws Exception { return TriggerResult.FIRE_AND_PURGE; }
 @Override public void clear(W window, TriggerContext ctx) throws Exception { 
ctx.getPartitionedState(stateDesc).clear(); }
 @Override public boolean canMerge() { return true; }
 @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { 
ctx.mergePartitionedState(stateDesc); }
 @Override public String toString() { return "BitwiseOrTrigger(" +  threshold + 
")"; }
 public static <W extends Window> BitwiseOrTrigger<W> of(int threshold, long 
expirationEpoc) { return new BitwiseOrTrigger<>(threshold, expirationEpoc); }
 private static class BitwiseOr implements ReduceFunction<Tuple2<Integer, 
Long>> { private static final long serialVersionUID = 1L;
 @Override public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> tup1, 
Tuple2<Integer, Long> tup2) throws Exception { Tuple2<Integer, Long> retTup = 
tup1; retTup.f0 = tup1.f0 | tup2.f0; return retTup; }
 }}

    On Monday, May 14, 2018, 6:00:11 AM EDT, Fabian Hueske <fhue...@gmail.com> 
wrote:  
 
 Hi Ashish,

Did you use per-window state (also called partitioned state) in your Trigger? 
If yes, you need to make sure that it is completely removed in the clear() 
method because processing time timers won't fire once a window was purged. 
So you cannot (fully) rely on timers to clean up per-window state.

Best, Fabian

2018-05-14 9:34 GMT+02:00 Kostas Kloudas <k.klou...@data-artisans.com>:

Hi Ashish,
It would be helpful to share the code of your custom trigger for the first 
case.Without that, we cannot tell what state you create and how/when you 
update/clear it.
Cheers,Kostas

On May 14, 2018, at 1:04 AM, ashish pok <ashish...@yahoo.com> wrote:
 Hi Till,
Thanks for getting back. I am sure that will fix the issue but I feel like that 
would potentially mask an issue. I have been going back and forth with Fabian 
on a use case where for some of our highly transient datasets, it might make 
sense to just use memory based state (except of course data loss becomes an 
issue when apps occasionally hit a problem and whole job restarts or app has to 
be taken down etc - ie. handling graceful shutdowns / restarts better 
essentially). I was on the hook to create a business case and post it back to 
this forum (which I am hoping I can get around to at some point soon). Long 
story short, this is one of those datasets. 
States in this case are either fired and cleared normally or on processing 
timeout. So technically, unless there is a memory leak in app code, memory 
usage should plateau out at a high-point. What I was noticing was memory would 
start to creep up ever so slowly. 
I couldn't tell exactly why heap utilization kept on growing (ever so slowly 
but it had upward trend for sure) because the states should technically be 
cleared if not as part of a reducing function then on timeout. App after 
running for couple of days would then run into Java Heap issues. So changing to 
RocksDB probably will fix the issue but not necessarily leak of states that 
should be cleared IMO. Interestingly, I switched my app from using something 
like this:
WindowedStream<BasicFactTuple, String, GlobalWindow> windowedStats = 
statsStream          .keyBy(BasicFactTuple::getKey)          
.window(GlobalWindows.create() )          .trigger(BitwiseOrTrigger.of( 60, 
AppConfigs.getWindowSize(5*60* 1000)))          ;
To 
 DataStream<PlatformEvent> processStats = pipStatsStream          
.keyBy(BasicFactTuple::getKey)          .process(new IfStatsReduceProcessFn( 
AppConfigs.getWindowSize(5*60* 1000), 60))
I basically moved logic of trigger to process function over the weekend. Once I 
did that, heap is completely stabilized. In trigger implementation, I was using 
FIRE_AND_PURGE on trigger condition or onProcessingTime and in process 
implementation I am using .clear() method for same. 
I seem to have solved the problem by using process but I'd be interested to 
understand the cause of why heap would creep up in trigger scenario. 
Hope this makes sense,
Ashish
    On Sunday, May 13, 2018, 4:06:59 PM EDT, Till Rohrmann 
<till.rohrm...@gmail.com> wrote:  
 
 Hi Ashish,
have you tried using Flink's RocksDBStateBackend? If your job accumulates state 
exceeding the available main memory, then you have to use a state backend which 
can spill to disk. The RocksDBStateBackend offers you exactly this 
functionality.
Cheers,Till
On Mon, Apr 30, 2018 at 3:54 PM, ashish pok <ashish...@yahoo.com> wrote:

All,
I am using noticing heap utilization creeping up slowly in couple of apps which 
eventually lead to OOM issue. Apps only have 1 process function that cache 
state. I did make sure I have a clear method invoked when events are collected 
normally, on exception and on timeout.
Are any other best practices others follow for memory backed states?
Thanks,

-- Ashish

  



  

Reply via email to