The watermark has not moved for this pattern to succeed ( or other wise ), the issue though is that it is pretty early in the pipe ( like within a minute ). I am replaying from a kafka topic but the keyed operator has emitted no more than 1500 plus elements to SelectCEPOperator ( very visible on the UI ) so am sure not enough elements have been added to the SharedBuffer to create memory stress.
The nature of the input stream is that events are pushed out with a specific timestamp ( it is a time series and the timestamp if the beginning of the time slot ) as in one will have a bunch of elements that have a constant timestamp till the next batch appears. A batch though does not have more than the number of keys elements ( 600 ). On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <vishal.santo...@gmail.com> wrote: > This is a pretty simple pattern, as in I hardly have 1500 elements ( across > 600 keys at the max ) put in > > and though I have a pretty wide range , as in I am looking at a relaxed > pattern ( like 40 true conditions in 6 hours ), > > I get this. I have the EventTime turned on. > > > > java.lang.OutOfMemoryError: Java heap space > > at java.util.Arrays.copyOf(Arrays.java:3332) > at > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) > at > java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at java.lang.StringBuilder.append(StringBuilder.java:131) > at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4106) > at org.apache.commons.lang3.StringUtils.join(StringUtils.java:4151) > at > org.apache.flink.cep.nfa.SharedBuffer$SharedBufferEntry.toString(SharedBuffer.java:624) > at java.lang.String.valueOf(String.java:2994) > at java.lang.StringBuilder.append(StringBuilder.java:131) > at > org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:964) > at > org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:835) > at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888) > at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820) > at > org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:100) > > . > > . > > . > > > Any one has seen this issue ? > > >