[ 
https://issues.apache.org/jira/browse/FLINK-32701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz reassigned FLINK-32701:
----------------------------------------

    Assignee: Puneet Duggal

> Potential Memory Leak in Flink CEP due to Persistent Starting States in 
> NFAState
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-32701
>                 URL: https://issues.apache.org/jira/browse/FLINK-32701
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.17.0, 1.16.1, 1.16.2, 1.17.1
>            Reporter: Puneet Duggal
>            Assignee: Puneet Duggal
>            Priority: Major
>              Labels: CEP, auto-deprioritized-critical, cep
>         Attachments: Screenshot 2023-07-26 at 11.45.06 AM.png, Screenshot 
> 2023-07-26 at 11.50.28 AM.png
>
>
> Our team has encountered a potential memory leak issue while working with the 
> Complex Event Processing (CEP) library in Flink v1.17.
> h2. Context
> The CEP Operator maintains a keyed state called NFAState, which holds two 
> queues: one for partial matches and one for completed matches. When a key is 
> first encountered, the CEP creates a starting computation state and stores it 
> in the partial matches queue. As more events occur that match the defined 
> conditions (e.g., a TAKE condition), additional computation states get added 
> to the queue, with their specific type (normal, pending, end) depending on 
> the pattern sequence.
> However, I have noticed that the starting computation state remains in the 
> partial matches queue even after the pattern sequence has been completely 
> matched. This is also the case for keys that have already timed out. As a 
> result, the state gets stored for all keys that the CEP ever encounters, 
> leading to a continual increase in the checkpoint size.
> h2.  How to reproduce this
>  # Pattern Sequence - A not_followed_by B within 5 mins
>  # Time Characteristic - EventTime
>  # StateBackend - HashMapStateBackend
> On my local machine, I started this pipeline and started sending events at 
> the rate of 10 events per second (only A) and as expected after 5 mins, CEP 
> started sending pattern matched output with the same rate. But the issue was 
> that after every 2 mins (checkpoint interval), checkpoint size kept on 
> increasing. Expectation was that after 5 mins (2-3 checkpoints), checkpoint 
> size will remain constant since any window of 5 mins will consist of the same 
> number of unique keys (older ones will get matched or timed out hence removed 
> from state). But as you can see below attached images, checkpoint size kept 
> on increasing till 40 checkpoints (around 1.5hrs).
> P.S. - After 3 checkpoints (6 mins), the checkpoint size was around 1.78MB. 
> Hence assumption is that ideal checkpoint size for a 5 min window should be 
> less than 1.78MB.
> As you can see after 39 checkpoints, I triggered a savepoint for this 
> pipeline. After that I used a savepoint reader to investigate what all is 
> getting stored in CEP states. Below code investigates NFAState of CEPOperator 
> for potential memory leak.
> {code:java}
> import lombok.AllArgsConstructor;
> import lombok.Data;
> import lombok.NoArgsConstructor;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.cep.nfa.NFAState;
> import org.apache.flink.cep.nfa.NFAStateSerializer;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.runtime.state.filesystem.FsStateBackend;
> import org.apache.flink.state.api.OperatorIdentifier;
> import org.apache.flink.state.api.SavepointReader;
> import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.util.Collector;
> import org.junit.jupiter.api.Test;
> import java.io.Serializable;
> import java.util.Objects;
> public class NFAStateReaderTest {
>     private static final String NFA_STATE_NAME = "nfaStateName";
>     @Test
>     public void testNfaStateReader() throws Exception {
>         StreamExecutionEnvironment environment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         SavepointReader savepointReader =
>                 SavepointReader.read(environment, 
> "file:///opt/flink/savepoints/savepoint-093404-9bc0a38654df", new 
> FsStateBackend("file:///abc"));
>         DataStream<NFAStateOutput> stream = 
> savepointReader.readKeyedState(OperatorIdentifier.forUid("select_pattern_events"),
>  new NFAStateReaderTest.NFAStateReaderFunction());
>         stream.print();
>         environment.execute();
>     }
>     static class NFAStateReaderFunction extends 
> KeyedStateReaderFunction<DynamicTuple, NFAStateOutput> {
>         private ValueState<NFAState> computationStates;
>         private static Long danglingNfaCount = 0L;
>         private static Long newNfaCount = 0L;
>         private static Long minTimestamp = Long.MAX_VALUE;
>         private static Long minKeyForCurrentNfa = Long.MAX_VALUE;
>         private static Long minKeyForDanglingNfa = Long.MAX_VALUE;
>         private static Long maxKeyForDanglingNfa = Long.MIN_VALUE;
>         private static Long maxKeyForCurrentNfa = Long.MIN_VALUE;
>         @Override
>         public void open(Configuration parameters) {
>             computationStates = getRuntimeContext().getState(new 
> ValueStateDescriptor<>(NFA_STATE_NAME, new NFAStateSerializer()));
>         }
>         @Override
>         public void readKey(DynamicTuple key, Context ctx, 
> Collector<NFAStateOutput> out) throws Exception {
>             NFAState nfaState = computationStates.value();
>             if 
> (Objects.requireNonNull(nfaState.getPartialMatches().peek()).getStartTimestamp()
>  != -1) {
>                 minTimestamp = Math.min(minTimestamp, 
> nfaState.getPartialMatches().peek().getStartTimestamp());
>                 minKeyForCurrentNfa = Math.min(minKeyForCurrentNfa, 
> Long.parseLong(key.getTuple().getField(0)));
>                 maxKeyForCurrentNfa = Math.max(maxKeyForCurrentNfa, 
> Long.parseLong(key.getTuple().getField(0)));
>                 newNfaCount++;
>             } else {
>                 danglingNfaCount++;
>                 minKeyForDanglingNfa = Math.min(minKeyForDanglingNfa, 
> Long.parseLong(key.getTuple().getField(0)));
>                 maxKeyForDanglingNfa = Math.max(maxKeyForDanglingNfa, 
> Long.parseLong(key.getTuple().getField(0)));
>             }
>             NFAStateOutput nfaStateOutput =
>                     new NFAStateOutput(
>                             danglingNfaCount,
>                             minTimestamp,
>                             newNfaCount,
>                             minKeyForCurrentNfa,
>                             maxKeyForCurrentNfa,
>                             minKeyForDanglingNfa,
>                             maxKeyForDanglingNfa);
>             out.collect(nfaStateOutput);
>         }
>     }
>     @Data
>     @NoArgsConstructor
>     @AllArgsConstructor
>     static class NFAStateOutput implements Serializable {
>         private Long danglingNfaCount;
>         private Long minTimestamp;
>         private Long newNfaCount;
>         private Long minKeyForCurrentNfa;
>         private Long maxKeyForCurrentNfa;
>         private Long minKeyForDanglingNfa;
>         private Long maxKeyForDanglingNfa;
>     }
> }
> {code}
>  
> As an output it printed nfaStateOutput for each key but since all the 
> attributes in nfaStateOutput are aggregates, hence finalOutput printed was
> {code:java}
> NFAStateReaderTest.NFAStateOutput(danglingNfaCount=34391, 
> minTimestamp=1690359951958, newNfaCount=3000, minKeyForCurrentNfa=6244230, 
> maxKeyForCurrentNfa=6247229, minKeyForDanglingNfa=629818, 
> maxKeyForDanglingNfa=6244229){code}
>  
> As we can see, checkpoint is storing approximately 34391 dangling states (for 
> keys which have expired (matched or timed out) ) whereas there are only 3000 
> active keys (for which there are partial matches which are eligible for 
> further pattern sequence matching) which is expected since throughput is 10 
> events per second which amounts to 3000 unique keys in 5 mins.
> h2.  Questions
> Hence, I am curious about the reasoning behind this design choice, 
> specifically why the starting state remains in the partial matches queue for 
> all keys, even those that have either timed out or completed their matches.
> Additionally, I am wondering what the implications would be if we were to 
> delete this starting state assuming that
>  # it is the only state left in the partial match queue.
>  # The completed match queue in nfaState is empty.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to