Here is the snippet : public interface Rule { DataStream<Alert> run(); }
public class Rule1 implements Rule { private static final String RULE_ID = "Rule1" @Override public DataStream<Alert> run() { Pattern<MyMessage1, ?> MyMessage1Pattern = Pattern.<MyMessage1>begin("first"). subtype(MyMessage1.class). next("second"). subtype(MyMessage1.class). within(Time.minutes(15); PatternStream<MyMessage1> MyMessage1PatternStream = CEP.pattern( MyMessage1DataStream.keyBy("field1", "field2"), MyMessage1Pattern ); return (MyMessage1PatternStream.select( new PatternSelectFunction<MyMessage1, Alert>() { @Override public Alert select(Map<String, List<MyMessage1>> pattern) throws Exception { String alertMessage = String.format("Cep Alert. Rule ID : %s" RULE_ID); return new CEPAlert(alertMessage); } } ) ); } private static List<Rule> getStream1RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) { List<Rule> rules = new ArrayList<Rule>(); rules.add(new Rule1(MyMessage1DataStream)); return rules; } private static List<Rule> getStream2RulesToExecute(DataStream<MyMessage1> MyMessage1DataStream) { List<Rule> rules = new ArrayList<Rule>(); rules.add(new Rule2(MyMessage1DataStream)); return rules; } public RichParallelSourceFunction<MyMessage1> getStreamSource1(StreamExecutionEnvironment env, ParameterTool parameterTool) { env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL)); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS); env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); KafkaDataSource<T> flinkCepConsumer = new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage1SerDeSchema()); return flinkCepConsumer; } public RichParallelSourceFunction<MyMessage2> getStreamSource2(StreamExecutionEnvironment env, ParameterTool parameterTool) { env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL, DEFAULT_CHECKPOINT_INTERVAL)); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS); env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); KafkaDataSource<T> flinkCepConsumer = new KafkaDataSource<MyMessage1>(parameterTool, new MyMessage2SerDeSchema()); return flinkCepConsumer; } public static void main(String[] args) throws Exception { ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[PROPS_FILE_ARG_INDEX]); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameterTool); DataStream<MyMessage1> message1Stream = env.addSource( getStreamSource1(env, parameterTool); ); DataStream<MyMessge2> message2Stream = env.addSource( getStreamSource2(env, parameterTool); ); getStream1RulesToExecute(message1Stream).forEach(rule -> rule.run().print()); getStream2RulesToExecute(message2tream).forEach(rule -> rule.run().print()); env.execute(STREAMING_JOB_NAME); } On Mon, Sep 25, 2017 at 3:13 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > I talked a bit with Kostas on what may be happening here. > > It could be that your patterns are not closing, which depends on the > pattern construction of your CEP job. > Could you perhaps provide an overview / code snippet of what your CEP job > is doing? > > Looping Kostas (in CC) also to this thread as he may have a better idea > what is happening here. > > Cheers, > Gordon > > On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa ( > flinken...@gmail.com) wrote: > > Thanks for the reply. Well, tracing back to the root cause, I see the > following: > > 1. At the Job manager, the Checkpoint times are getting worse : > > Jobmanager : > > Checkpoint times are getting worse progressively. > > 2017-09-16 05:05:50,813 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 1 @ 1505538350809 > 2017-09-16 05:05:51,396 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 1 (11101233 bytes in 586 ms). > 2017-09-16 05:07:30,809 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 2 @ 1505538450809 > 2017-09-16 05:07:31,657 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 2 (18070955 bytes in 583 ms). > > . > . > . > . > . > . > . > . > . > . > . > . > . > 2017-09-16 07:32:58,117 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 89 (246125113 bytes in 27194 ms). > 2017-09-16 07:34:10,809 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 90 @ 1505547250809 > 2017-09-16 07:34:44,932 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 90 (248272325 bytes in 34012 ms). > 2017-09-16 07:35:50,809 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 91 @ 1505547350809 > 2017-09-16 07:36:37,058 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 91 (250348812 bytes in 46136 ms). > 2017-09-16 07:37:30,809 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 92 @ 1505547450809 > 2017-09-16 07:38:18,076 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 92 (252399724 bytes in 47152 ms). > 2017-09-16 07:39:10,809 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 93 @ 1505547550809 > 2017-09-16 07:40:13,494 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 93 (254374636 bytes in 62573 ms). > 2017-09-16 07:40:50,809 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 94 @ 1505547650809 > 2017-09-16 07:42:42,850 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 94 (256386533 bytes in 111898 ms). > 2017-09-16 07:42:42,850 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 95 @ 1505547762850 > 2017-09-16 07:46:06,241 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Completed checkpoint 95 (258441766 bytes in 203268 ms). > 2017-09-16 07:46:06,241 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator > - Triggering checkpoint 96 @ 1505547966241 > 2017-09-16 07:48:42,069 INFO org.apache.flink.runtime. > executiongraph.ExecutionGraph - KeyedCEPPatternOperator -> Map > (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d) switched from RUNNING to FAILED. > AsynchronousException{java.lang.Exception: Could not materialize > checkpoint 96 for operator KeyedCEPPatternOperator -> Map (1/4).} > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncCheckpointRunnable.run(StreamTask.java:970) > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 96 for > operator KeyedCEPPatternOperator -> Map (1/4). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.OutOfMemoryError: GC overhead limit exceeded > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet( > FutureUtil.java:43) > at org.apache.flink.streaming.runtime.tasks.StreamTask$ > AsyncCheckpointRunnable.run(StreamTask.java:897) > ... 5 more > > > So, it looks like the Job Manager ran out of memory, thanks to the > "Progressively Getting Worse" checkpoints. Any ideas on how to make sure > the checkpoints faster? > > > > > > > On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi Sridhar, >> >> Sorry that this didn't get a response earlier. >> >> According to the trace, it seems like the job failed during the process, >> and >> when trying to automatically restore from a checkpoint, deserialization >> of a >> CEP `IterativeCondition` object failed. As far as I can tell, CEP >> operators >> are just using Java serialization on CEP `IterativeCondition` objects, so >> should not be related to the protobuf serializer that you are using. >> >> Is this still constantly happening for you? >> >> Cheers, >> Gordon >> >> >> >> -- >> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4. >> nabble.com/ >> > >