Hello all,  There were recent changes to the flink master that I pulled in
and that *seems* to have solved our issue.

Few points

* CEP is heavy as the NFA  transition  matrix   as state which can be
possibly  n^2 ( worst case )  can easily blow up space requirements.  The
after match skip strategy is likely to play a crucial role in keeping the
state lean https://ci.apache.org/projects/flink/flink-docs-
master/dev/libs/cep.html#after-match-skip-strategy.  In our case we do not
require partial matches within a match to contribute to another potential
match ( noise for us )  and thus *SKIP_PAST_LAST_EVENT *was used which on
match will prune the SharedBuffer ( almost reset it )

* The argument that the pattern events should be lean holds much more in
CEP due to the potential exponential increase in space requirements.

* The nature of the pattern will require consideration if state does blow
up for you.

Apart from that, I am still not sure why toString() on SharedBuffer was
called to get an OOM to begin with.



On Mon, Feb 26, 2018 at 2:09 PM, Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> We could not recreate in a controlled setup, but here are a few notes that
> we have gathered on a simple  "times(n),within(..)"
>
> In case where the Event does not create a Final or Stop state
>
> * As an NFA processes an Event, NFA mutates if there is a true Event. Each
> computation is a counter that keeps track of partial matches with each true
> Event already existent partial match for that computation unit. Essentially
> for n Events and if each Event is a true there will be roughly n-1
> computations, each with representing an Event from 1 to n-1 ( so 1 or first
> will have n-1 events in the partial match, 2 has n-1 events and so on and
> n-1 has the last event as a partial match ).
>
> * If the WM progresses  beyond the ts of the 1st computation, that partial
> match is pruned.
>
> * It makes sure that a SharedBufferEntry is pruned only if the count of
> Edges originating from it reduces to 0 ( the internalRemove() which uses a
> Stack) , which should happen as WM keeps progressing to the nth element for
> unfulfilled patterns. A "null" ( not a fan ) event is used to establish  a
> WM progression
>
>
> In case there is a FinalState  ( and we skipToFirstAfterLast )
>
> * The NFA by will prune ( release )  all partial matches and prune the
> shared buffer and emit the current match. The computations now should be
> empty.
>
> There is a lot to it, but is that roughly what is done in that code ?
>
>
>
> Few questions.
>
> * What we have seen is that the call to toString method of SharedBuffer is
> where OOM occurs. Now in the code there is no call to a Log so we are not
> sure why the method or who calls that method. Surely that is not part of
> the Seriazation/DeSer routine or is it ( very surprising if it is )
> * There is no out of the box implementation of "m out of n"  pattern
> match. We have to resort to n in range ( m * time series slot ) which we
> do. This is fine but what it does not allow is an optimization where if n
> false conditions are seen, one can prune.  Simply speaking if n-m  false
> have been seen there is no way  that out of n there will be ever m trues
> and thus SharedBuffer can be pruned to the last true seen ( very akin to 
> skipToFirstAfterLast
> ).
>
> We will keep instrumenting the code ( which apart from the null message is
> easily understandable ) but would love to hear your feedback.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> On Tue, Feb 6, 2018 at 12:00 PM, Kostas Kloudas <
> k.klou...@data-artisans.com> wrote:
>
>> Thanks a lot Vishal!
>>
>> We are looking forward to a test case that reproduces the failure.
>>
>> Kostas
>>
>>
>> On Feb 2, 2018, at 4:05 PM, Vishal Santoshi <vishal.santo...@gmail.com>
>> wrote:
>>
>> This is the pattern. Will create a test case.
>>
>> /**
>>  *
>>  * @param condition a single condition is applied as a  acceptance criteria
>>  * @param params defining the bounds of the pattern.
>>  * @param <U> the element in the stream
>>  * @return compiled pattern alonf with the params.
>>  */
>> public static <U extends HasTime & HasKey> RelaxedContiguousPattern<U> 
>> of(SimpleCondition<U> condition,
>>                                                                           
>> RelaxedContiguityWithinTime params,
>>                                                                           
>> RichMapFunction<List<PatternMatch<U>>, List<PatternMatch<U>>> mapFunc,
>>                                                                           
>> String patternId) {
>>     assert (params.seriesLength >= params.elementCount && 
>> params.elementCount > 0);
>>     Pattern<U, ?> pattern = Pattern.
>>             <U>begin(START).
>>             where(condition);
>>     if (params.elementCount > 1) pattern = pattern.
>>             followedBy(REST).
>>             where(condition).
>>             times(params.elementCount - 1);
>>
>>
>>     return new RelaxedContiguousPattern<U>(
>>             pattern.within(Time.minutes(params.seriesLength * 
>> params.period.duration))
>>             ,params,
>>             params.elementCount > 1,
>>             params.period.duration,
>>             mapFunc,
>>             patternId
>>     );
>> }
>>
>>
>>
>>
>> On Fri, Feb 2, 2018 at 7:53 AM, Dawid Wysakowicz <
>> wysakowicz.da...@gmail.com> wrote:
>>
>>> Could you provide some example to reproduce the case? Or the Pattern
>>> that you are using? It would help track down the issue.
>>>
>>> > On 2 Feb 2018, at 13:35, Vishal Santoshi <vishal.santo...@gmail.com>
>>> wrote:
>>> >
>>> > I have pulled in the flink master cep library and the runtime ( the
>>> cluster ) is configured to work against the latest and greatest. This does
>>> not happen with smaller range patterns ( 3 out of 5 , 1 of 3 etc) but is
>>> always an issue when it is a larger range ( 20 out of 25 with range of 8
>>> hours ) . Does that makes sense?
>>> >
>>> > On Fri, Feb 2, 2018 at 5:17 AM, Dawid Wysakowicz <
>>> wysakowicz.da...@gmail.com> wrote:
>>> > This problem sounds very similar to this one that was fixed for 1.4.1
>>> and 1.5.0:
>>> > https://issues.apache.org/jira/browse/FLINK-8226
>>> >
>>> > Could you check if that helps with your problem too?
>>> >
>>> > > On 1 Feb 2018, at 23:34, Vishal Santoshi <vishal.santo...@gmail.com>
>>> wrote:
>>> > >
>>> > > I have flink master CEP library code imported to  a 1.4 build.
>>> > >
>>> > > On Thu, Feb 1, 2018 at 5:33 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>> > > A new one
>>> > >
>>> > > java.lang.OutOfMemoryError: Java heap space
>>> > >       at java.util.Arrays.copyOf(
>>> > > Arrays.java:3332)
>>> > >       at java.lang.
>>> > > AbstractStringBuilder.ensureCapacityInternal(AbstractStringB
>>> uilder.java:
>>> > > 124)
>>> > >       at java.lang.
>>> > > AbstractStringBuilder.append(AbstractStringBuilder.java:
>>> > > 448)
>>> > >       at java.lang.StringBuilder.
>>> > > append(StringBuilder.java:136)
>>> > >       at java.lang.StringBuilder.
>>> > > append(StringBuilder.java:131)
>>> > >       at org.apache.commons.lang3.
>>> > > StringUtils.join(StringUtils.
>>> > > java:4106)
>>> > >       at org.apache.commons.lang3.
>>> > > StringUtils.join(StringUtils.
>>> > > java:4151)
>>> > >       at org.apache.flink.cep.nfa.
>>> > > SharedBuffer$SharedBufferEntry.toString(
>>> > > SharedBuffer.java:624)
>>> > >       at java.lang.String.valueOf(
>>> > > String.java:2994)
>>> > >       at java.lang.StringBuilder.
>>> > > append(StringBuilder.java:131)
>>> > >       at org.apache.flink.cep.nfa.
>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:
>>> > > 673)
>>> > >       at java.lang.String.valueOf(
>>> > > String.java:2994)
>>> > >       at java.lang.StringBuilder.
>>> > > append(StringBuilder.java:131)
>>> > >       at org.apache.commons.lang3.
>>> > > StringUtils.join(StringUtils.
>>> > > java:4097)
>>> > >       at org.apache.commons.lang3.
>>> > > StringUtils.join(StringUtils.
>>> > > java:4151)
>>> > >       at org.apache.flink.cep.nfa.
>>> > > SharedBuffer$SharedBufferEntry.toString(
>>> > > SharedBuffer.java:624)
>>> > >       at java.lang.String.valueOf(
>>> > > String.java:2994)
>>> > >       at java.lang.StringBuilder.
>>> > > append(StringBuilder.java:131)
>>> > >       at org.apache.flink.cep.nfa.
>>> > > SharedBuffer$SharedBufferEdge.toString(SharedBuffer.java:673)
>>> > > .
>>> > > .
>>> > > .
>>> > > It is the toString() on
>>> > > SharedBuffer
>>> > > no doubt. Some recursive loop ?
>>> > >
>>> > >
>>> > > On Thu, Feb 1, 2018 at 5:17 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>> > > It happens when it looks to throw an exception and calls
>>> shardBuffer.toString. b'coz of the check....
>>> > >
>>> > >
>>> > > int id = sharedBuffer.entryId;
>>> > > Preconditions.checkState(id != -1, "Could not find id for entry: " +
>>> sharedBuffer);
>>> > >
>>> > >
>>> > > On Thu, Feb 1, 2018 at 5:09 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>> > > The watermark has not moved for this pattern to succeed ( or other
>>> wise ), the issue though is that it is pretty early in the pipe ( like
>>> within a minute ).  I am replaying from a kafka topic but the keyed
>>> operator has emitted no more than 1500 plus elements to SelectCEPOperator (
>>> very visible on the UI ) so am sure not enough elements have been added to
>>> the SharedBuffer to create memory stress.
>>> > >
>>> > > The nature of the input stream is that events are pushed out with a
>>> specific timestamp ( it is a time series and the timestamp if the beginning
>>> of the time slot )  as in one will have a bunch of elements that have a
>>> constant timestamp till the next batch appears.
>>> > >
>>> > > A batch though does not have more than the number of keys elements (
>>> 600 ).
>>> > >
>>> > > On Thu, Feb 1, 2018 at 4:53 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>> > > This is a pretty simple pattern, as in I hardly have 1500 elements (
>>> across 600 keys at the max ) put in
>>> > > and though I have a pretty wide range , as in I am looking at a
>>> relaxed pattern ( like 40 true conditions in 6 hours ),
>>> > > I get this. I have the EventTime turned on.
>>> > >
>>> > >
>>> > > java.lang.OutOfMemoryError: Java heap space
>>> > >       at java.util.Arrays.copyOf(Arrays
>>> > > .java:3332)
>>> > >       at java.lang.AbstractStringBuilde
>>> > > r.ensureCapacityInternal(Abstr
>>> > > actStringBuilder.java:124)
>>> > >       at java.lang.AbstractStringBuilde
>>> > > r.append(AbstractStringBuilder
>>> > > .java:448)
>>> > >       at java.lang.StringBuilder.append
>>> > > (StringBuilder.java:136)
>>> > >       at java.lang.StringBuilder.append
>>> > > (StringBuilder.java:131)
>>> > >       at org.apache.commons.lang3.Strin
>>> > > gUtils.join(StringUtils.java:4
>>> > > 106)
>>> > >       at org.apache.commons.lang3.Strin
>>> > > gUtils.join(StringUtils.java:4
>>> > > 151)
>>> > >       at org.apache.flink.cep.nfa.Share
>>> > > dBuffer$SharedBufferEntry.toSt
>>> > > ring(SharedBuffer.java:624)
>>> > >       at java.lang.String.valueOf(Strin
>>> > > g.java:2994)
>>> > >       at java.lang.StringBuilder.append
>>> > > (StringBuilder.java:131)
>>> > >       at org.apache.flink.cep.nfa.Share
>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:9
>>> > > 64)
>>> > >       at org.apache.flink.cep.nfa.Share
>>> > > dBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:8
>>> > > 35)
>>> > >       at org.apache.flink.cep.nfa.NFA$N
>>> > > FASerializer.serialize(NFA.jav
>>> > > a:888)
>>> > >       at org.apache.flink.cep.nfa.NFA$N
>>> > > FASerializer.serialize(NFA.jav
>>> > > a:820)
>>> > >       at org.apache.flink.contrib.strea
>>> > > ming.state.RocksDBValueState.update(RocksDBValueState.java:100)
>>> > > .
>>> > > .
>>> > > .
>>> > >
>>> > > Any one has seen this issue ?
>>> > >
>>> > >
>>> > >
>>> > >
>>> > >
>>> >
>>> >
>>>
>>>
>>
>>
>

Reply via email to