Hi Aarti check this* https://haoch.github.io/flink-siddhi/ <https://haoch.github.io/flink-siddhi/>*
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); DataStream<Tuple5<Integer,String,Integer,String,Double>> output = cep .from("inputStream1").union("inputStream2") .cql( "from every s1 = inputStream1[id == 2] " + " -> s2 = inputStream2[id == 3] " + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price" + "insert into outputStream") .returns("outputStream"); env.execute(); After developing the poc we came across this thing. On Mon, Jul 9, 2018 at 5:12 PM, Puneet Kinra < puneet.ki...@customercentria.com> wrote: > Check siddhi project. > > On Mon, Jul 9, 2018 at 5:09 PM, Aarti Gupta <aagu...@qualys.com> wrote: > >> Hi, >> >> We are evaluating Esper <http://www.espertech.com/> to use as a CEP >> plugged into Flink. >> >> We would want to use Flink's connected streams to connect our rules and >> events streams and then invoke Esper CEP in the co-process function to >> evaluate the rules against the events. >> >> Would there be any gotchas if we did this ? >> >> --Aarti >> >> >> >> >> >> On Thu, Jul 5, 2018 at 6:37 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> > Flink doesn't support connecting multiple streams with heterogeneous >>> schema >>> >>> This is not correct. >>> Flink is very well able to connect streams with different schema. >>> However, you cannot union two streams with different schema. >>> In order to reconfigure an operator with changing rules, you can use >>> BroadcastProcessFunction or KeyedBroadcastProcessFunction [1]. >>> >>> In order to dynamically reconfigure aggregations and windowing, you >>> would need to implement the processing logic yourself in the process >>> function using state and timers. >>> There is no built-in support to reconfigure such operators. >>> >>> Best, >>> Fabian >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/ >>> dev/stream/state/broadcast_state.html >>> >>> >>> 2018-07-05 14:41 GMT+02:00 Puneet Kinra <puneet.kinra@customercentria. >>> com>: >>> >>>> Hi Aarti >>>> >>>> Flink doesn't support connecting multiple streams with heterogeneous >>>> schema ,you can try the below solution >>>> >>>> a) If stream A is sending some events make the output of that as >>>> String/JsonString. >>>> >>>> b) If stream B is sending some events make the output of that as >>>> String/JsonString. >>>> >>>> c) Now Using union function you can connect all the streams & use >>>> FlatMap or process function to >>>> evaluate all these streams against your defined rules. >>>> >>>> d) For Storing your aggregations and rules you can build your cache >>>> layer and pass as a argument >>>> to the constructor of that flatmap. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Mon, Jul 2, 2018 at 2:38 PM, Aarti Gupta <aagu...@qualys.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> We are currently evaluating Flink to build a real time rule engine >>>>> that looks at events in a stream and evaluates them against a set of >>>>> rules. >>>>> >>>>> The rules are dynamically configured and can be of three types - >>>>> 1. Simple Conditions - these require you to look inside a single >>>>> event. Example, match rule if A happens. >>>>> 2. Aggregations - these require you to aggregate multiple events. >>>>> Example, match rule if more than five A's happen. >>>>> 3. Complex patterns - these require you to look at multiple events and >>>>> detect patterns. Example, match rule if A happens and then B happens. >>>>> >>>>> Since the rules are dynamically configured, we cannot use CEP. >>>>> >>>>> As an alternative, we are using connected streams and the CoFlatMap >>>>> function to store the rules in shared state, and evaluate each incoming >>>>> event against the stored rules. Implementation is similar to what's >>>>> outlined here >>>>> <https://data-artisans.com/blog/bettercloud-dynamic-alerting-apache-flink> >>>>> . >>>>> >>>>> My questions - >>>>> >>>>> 1. Since the CoFlatMap function works on a single event, how do we >>>>> evaluate rules that require aggregations across events. (Match rule if >>>>> more >>>>> than 5 A events happen) >>>>> 2. Since the CoFlatMap function works on a single event, how do we >>>>> evaluate rules that require pattern detection across events (Match >>>>> rule if >>>>> A happens, followed by B). >>>>> 3. How do you dynamically define a window function. >>>>> >>>>> >>>>> --Aarti >>>>> >>>>> >>>>> -- >>>>> Aarti Gupta <https://www.linkedin.com/company/qualys> >>>>> Director, Engineering, Correlation >>>>> >>>>> >>>>> aagu...@qualys.com >>>>> T >>>>> >>>>> >>>>> Qualys, Inc. – Blog <https://qualys.com/blog> | Community >>>>> <https://community.qualys.com> | Twitter <https://twitter.com/qualys> >>>>> >>>>> >>>>> <https://www.qualys.com/email-banner> >>>>> >>>> >>>> >>>> >>>> -- >>>> *Cheers * >>>> >>>> *Puneet Kinra* >>>> >>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com >>>> <puneet.ki...@customercentria.com>* >>>> >>>> *e-mail :puneet.ki...@customercentria.com >>>> <puneet.ki...@customercentria.com>* >>>> >>>> >>>> >>> >> >> >> -- >> Aarti Gupta <https://www.linkedin.com/company/qualys> >> Director, Engineering, Correlation >> >> >> aagu...@qualys.com >> T >> >> >> Qualys, Inc. – Blog <https://qualys.com/blog> | Community >> <https://community.qualys.com> | Twitter <https://twitter.com/qualys> >> >> >> <https://www.qualys.com/email-banner> >> > > > > -- > *Cheers * > > *Puneet Kinra* > > *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com > <puneet.ki...@customercentria.com>* > > *e-mail :puneet.ki...@customercentria.com > <puneet.ki...@customercentria.com>* > > > -- *Cheers * *Puneet Kinra* *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>* *e-mail :puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>*