Here is the snippet :

public interface Rule {
    DataStream<Alert> run();
}

public class Rule1 implements Rule {

    private static final String RULE_ID = "Rule1"

    @Override
    public DataStream<Alert> run() {


        Pattern<MyMessage1, ?> MyMessage1Pattern =
                Pattern.<MyMessage1>begin("first").
                        subtype(MyMessage1.class).
                        next("second").
                        subtype(MyMessage1.class).
                        within(Time.minutes(15);

        PatternStream<MyMessage1> MyMessage1PatternStream =
                CEP.pattern(
                        MyMessage1DataStream.keyBy("field1", "field2"),
                        MyMessage1Pattern
                );

       return (MyMessage1PatternStream.select(
                new PatternSelectFunction<MyMessage1, Alert>() {
                    @Override
                    public Alert select(Map<String, List<MyMessage1>>
pattern) throws Exception {

                        String alertMessage = String.format("Cep Alert.
Rule ID : %s" RULE_ID);

                        return new CEPAlert(alertMessage);
                    }
                }
            )
        );

    }



    private static List<Rule>
getStream1RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
        List<Rule> rules = new ArrayList<Rule>();

        rules.add(new Rule1(MyMessage1DataStream));

        return rules;
    }


    private static List<Rule>
getStream2RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) {
        List<Rule> rules = new ArrayList<Rule>();

        rules.add(new Rule2(MyMessage1DataStream));
        return rules;
    }


   public RichParallelSourceFunction<MyMessage1>
getStreamSource1(StreamExecutionEnvironment env, ParameterTool
parameterTool) {


        env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL,
DEFAULT_CHECKPOINT_INTERVAL));

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);

env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


        KafkaDataSource<T> flinkCepConsumer =
                new KafkaDataSource<MyMessage1>(parameterTool, new
MyMessage1SerDeSchema());

        return flinkCepConsumer;
    }


   public RichParallelSourceFunction<MyMessage2>
getStreamSource2(StreamExecutionEnvironment env, ParameterTool
parameterTool) {


        env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL,
DEFAULT_CHECKPOINT_INTERVAL));

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);

env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);


        KafkaDataSource<T> flinkCepConsumer =
                new KafkaDataSource<MyMessage1>(parameterTool, new
MyMessage2SerDeSchema());

        return flinkCepConsumer;
    }


    public static void main(String[] args) throws Exception {
        ParameterTool parameterTool =
ParameterTool.fromPropertiesFile(args[PROPS_FILE_ARG_INDEX]);

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        env.getConfig().setGlobalJobParameters(parameterTool);

        DataStream<MyMessage1> message1Stream = env.addSource(
            getStreamSource1(env, parameterTool);
        );


        DataStream<MyMessge2> message2Stream = env.addSource(
            getStreamSource2(env, parameterTool);
        );


        getStream1RulesToExecute(message1Stream).forEach(rule ->
rule.run().print());
        getStream2RulesToExecute(message2tream).forEach(rule ->
rule.run().print());
        env.execute(STREAMING_JOB_NAME);
    }






On Mon, Sep 25, 2017 at 3:13 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> I talked a bit with Kostas on what may be happening here.
>
> It could be that your patterns are not closing, which depends on the
> pattern construction of your CEP job.
> Could you perhaps provide an overview / code snippet of what your CEP job
> is doing?
>
> Looping Kostas (in CC) also to this thread as he may have a better idea
> what is happening here.
>
> Cheers,
> Gordon
>
> On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa (
> flinken...@gmail.com) wrote:
>
> Thanks for the reply. Well, tracing back to the root cause, I see the
> following:
>
> 1. At the Job manager, the Checkpoint times are getting worse :
>
> Jobmanager :
>
> Checkpoint times are getting worse progressively.
>
> 2017-09-16 05:05:50,813 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 1 @ 1505538350809
> 2017-09-16 05:05:51,396 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 1 (11101233 bytes in 586 ms).
> 2017-09-16 05:07:30,809 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 2 @ 1505538450809
> 2017-09-16 05:07:31,657 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 2 (18070955 bytes in 583 ms).
>
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
>                                                           .
> 2017-09-16 07:32:58,117 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 89 (246125113 bytes in 27194 ms).
> 2017-09-16 07:34:10,809 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 90 @ 1505547250809
> 2017-09-16 07:34:44,932 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 90 (248272325 bytes in 34012 ms).
> 2017-09-16 07:35:50,809 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 91 @ 1505547350809
> 2017-09-16 07:36:37,058 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 91 (250348812 bytes in 46136 ms).
> 2017-09-16 07:37:30,809 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 92 @ 1505547450809
> 2017-09-16 07:38:18,076 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 92 (252399724 bytes in 47152 ms).
> 2017-09-16 07:39:10,809 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 93 @ 1505547550809
> 2017-09-16 07:40:13,494 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 93 (254374636 bytes in 62573 ms).
> 2017-09-16 07:40:50,809 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 94 @ 1505547650809
> 2017-09-16 07:42:42,850 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 94 (256386533 bytes in 111898 ms).
> 2017-09-16 07:42:42,850 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 95 @ 1505547762850
> 2017-09-16 07:46:06,241 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 95 (258441766 bytes in 203268 ms).
> 2017-09-16 07:46:06,241 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 96 @ 1505547966241
> 2017-09-16 07:48:42,069 INFO  org.apache.flink.runtime.
> executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map
> (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).}
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:970)
>     at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 96 for
> operator KeyedCEPPatternOperator -> Map (1/4).
>     ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:43)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:897)
>     ... 5 more
>
>
> So, it looks like the Job Manager ran out of memory, thanks to the
> "Progressively Getting Worse" checkpoints. Any ideas on how to make sure
> the checkpoints faster?
>
>
>
>
>
>
> On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi Sridhar,
>>
>> Sorry that this didn't get a response earlier.
>>
>> According to the trace, it seems like the job failed during the process,
>> and
>> when trying to automatically restore from a checkpoint, deserialization
>> of a
>> CEP `IterativeCondition` object failed. As far as I can tell, CEP
>> operators
>> are just using Java serialization on CEP `IterativeCondition` objects, so
>> should not be related to the protobuf serializer that you are using.
>>
>> Is this still constantly happening for you?
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>
>

Reply via email to