[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266320#comment-16266320 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch closed the pull request at: https://github.com/apache/flink/pull/2487 > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266319#comment-16266319 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 Finally, the PR merged into https://github.com/apache/bahir-flink/pull/22, thanks everyone for the review! > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261822#comment-16261822 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 Keep this PR open until https://github.com/apache/bahir-flink/pull/22 merged. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258055#comment-16258055 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 As mentioned above, moved the patch to Apache Bahir at https://github.com/apache/bahir/pull/54, resolve this one. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049980#comment-16049980 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 @asdf2014 sorry for delayed response, will find some time to finalize this PR and propose to Bahir project . > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16049098#comment-16049098 ] ASF GitHub Bot commented on FLINK-4520: --- Github user asdf2014 commented on the issue: https://github.com/apache/flink/pull/2487 @rmetzger Alright. Thank you for your asking, but i think our company still plan to use `Flink-CEP` and contributing `Bahir-Siddhi` feature is a huge job, so... I'm so sorry. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16048979#comment-16048979 ] ASF GitHub Bot commented on FLINK-4520: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2487 @asdf2014 I think there was never a PR for siddhi at the Bahir project. But if you are interested, you could work on contributing it to bahir. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044112#comment-16044112 ] ASF GitHub Bot commented on FLINK-4520: --- Github user asdf2014 commented on the issue: https://github.com/apache/flink/pull/2487 @dianfu Great! Look forward to. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044098#comment-16044098 ] ASF GitHub Bot commented on FLINK-4520: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/2487 @asdf2014 We're currently working on integrating Flink Table & SQL API with CEP. It will add more capacity to the Flink cep library and also makes it more easily to use. An initial design doc can be got [here](https://docs.google.com/document/d/1HaaO5eYI1VZjyhtVPZOi3jVzikU7iK15H0YbniTnN30/edit#). The prototype and detailed design doc will come out very soon. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16044079#comment-16044079 ] ASF GitHub Bot commented on FLINK-4520: --- Github user asdf2014 commented on the issue: https://github.com/apache/flink/pull/2487 Hi, @haoch @rmetzger . Why i cannot find `flink-siddhi` in [bahir](https://github.com/apache/bahir) or [bahir-flink](https://github.com/apache/bahir-flink)? In addition, the [flink-siddhi](https://github.com/haoch/flink-siddhi) still depence on `flink v1.1.2`. May i ask how long will i could use these advanced features of `Siddhi CEP` on flink... > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15877693#comment-15877693 ] ASF GitHub Bot commented on FLINK-4520: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2487 Cool, than you! > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15877680#comment-15877680 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 @uce sure, will fix it and resend PR to Bahir. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874820#comment-15874820 ] ASF GitHub Bot commented on FLINK-4520: --- Github user uce commented on the issue: https://github.com/apache/flink/pull/2487 @haoch What do you think about Robert's suggestion to move this to Bahir? Seems like a reasonable first step to me. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15601918#comment-15601918 ] ASF GitHub Bot commented on FLINK-4520: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/2487 Hi @haoch, thanks alot for this contribution. I recently started moving some of the streaming connectors of Flink to Apache Bahir, a community for extensions to Spark, Flink (and maybe others). You wrote in an earlier comment: > I think it's both ok to keep this in the core or as an separated project, but the concern is it maybe better for community development to centralize qualified libraries togather. I think Bahir is addressing this issue nicely. So far we added only streaming connectors to Bahir, but I would like to see libraries and other things build on top of Flink there as well. I'm a committer at Bahir and can help you to get the code in there. The Bahir repository is located here https://github.com/apache/bahir-flink By the way, the tests you've added are failing on our CI system. Can you look into it? https://s3.amazonaws.com/archive.travis-ci.org/jobs/166483919/log.txt > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509478#comment-15509478 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 @StephanEwen sure, that's all ok. :-) > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15509441#comment-15509441 ] ASF GitHub Bot commented on FLINK-4520: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2487 That all sounds very good. I would personally like it if this contribution would enter Flink in some way. There have been thoughts and discussions one in a while about creating a dedicated sub-projects for libraries/extensions like this, or at least a dedicated repository under Flink. I think this would be a great opportunity to revive those discussions. Let me start a thread on the mailing list. @haoch I hope you are okay with waiting for a few days for that discussion to come to a conclusion. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15507010#comment-15507010 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 @StephanEwen thanks for the comments, I think it's both ok to keep this in the core or as an separated project, but the concern is it maybe better for community development to centralize qualified libraries togather. As an alternative solution for too test stability and dead code, may it possible to create another code repository say "flink-library"? **BTW: here are the answers to your questions one by one:** > How complete is the implementation? Siddhi is a rich-featured CEP and has its own community, and maybe almost the only open source CEP solutions compatible with Apache License. And this library `flink-siddhi` is mainly focused on bring siddhi's capability to flink users seamlessly by: - Integrate Siddhi CEP runtime with flink lifecycle - Schema and DataStream source mapping - State management and fault-tolerant. So I think it would be extremely light-weight but useful, and the current implementation should be almost completed. > Would you be up for maintaining this code? Sure, first of all, personally I am very willing to keep continuously contributing to Flink project in any way. And also we used siddhi with distributed streaming system a lot in production, and currently considering to support flink as well under consideration of better state management and window supporting. So I would continuously maintain the code if merged, it not, I would maintain at https://github.com/haoch/flink-siddhi as well to make sure it's workable. > Are you building this as an experiment, or building a production use case based on Siddhi on Flink? We use siddhi with streaming environment in production a lot, currently supports storm and spark streaming, and also consider extending to Flink. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output =
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15506964#comment-15506964 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 @mushketyk I have added lots of java docs as required in latest commit: https://github.com/apache/flink/pull/2487/commits/4699f9c3dfc4ce0a9837eb60579c76d50b346f03, please continue to help review. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen >Assignee: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15504300#comment-15504300 ] ASF GitHub Bot commented on FLINK-4520: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2487 Thank you for that big contribution. Siddhi looks like a cool approach to CEP. Before digging into the details, I would like to start a discussion about whether we should have this as a part of the core Flink repository, as a subproject, or if it would be best to have it initially as an external project. The reason is that that Flink repository is becoming a bit big right now. Build times are very long, test stability hard to manage, and there is quite a bit of "dead" code that was contributed by someone at some point but seems rarely used and is not maintained by the contributors. To help have a good discussion, it would be great to learn a bit more: - How complete is the implementation? - Would you be up for maintaining this code? - Are you building this as an experiment, or building a production use case based on Siddhi on Flink? Thanks, Stephan > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15486023#comment-15486023 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 @mushketyk thanks very much for reviewing, will fix as required soon. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485230#comment-15485230 ] ASF GitHub Bot commented on FLINK-4520: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2487 Hi @haoch, I think it would be beneficial if you write a few words describing your design or add more JavaDocs. This would make the review process more straightforward. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485210#comment-15485210 ] ASF GitHub Bot commented on FLINK-4520: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78449235 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/SiddhiOperatorContext.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.operator; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.contrib.siddhi.schema.StreamSchema; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.util.Preconditions; +import org.wso2.siddhi.core.SiddhiManager; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * SiddhiCEP Operator Execution Context + */ +public class SiddhiOperatorContext implements Serializable { --- End diff -- I think all public methods would benefit from JavaDocs > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", >
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485183#comment-15485183 ] ASF GitHub Bot commented on FLINK-4520: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78447224 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.operator; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.StreamSchema; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.stream.input.InputHandler; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +public abstract class AbstractSiddhiOperatorextends AbstractStreamOperator implements OneInputStreamOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class); + private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11; + + private final SiddhiOperatorContext siddhiPlan; + private final String executionExpression; + private final boolean isProcessingTime; + private final Map streamRecordSerializers; + + private transient SiddhiManager siddhiManager; + private transient ExecutionPlanRuntime siddhiRuntime; + private transient Map inputStreamHandlers; + + // queue to buffer out of order stream records + private transient PriorityQueue priorityQueue; + + /** +* @param siddhiPlan Siddhi CEP Execution Plan +*/ + public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) { + validate(siddhiPlan); + this.executionExpression = siddhiPlan.getFinalExecutionPlan(); + this.siddhiPlan = siddhiPlan; + this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + this.streamRecordSerializers = new HashMap<>(); + + for (String streamId : this.siddhiPlan.getInputStreams()) { + streamRecordSerializers.put(streamId, createStreamRecordSerializer(this.siddhiPlan.getInputStreamSchema(streamId), this.siddhiPlan.getExecutionConfig())); + } + } + + protected abstract MultiplexingStreamRecordSerializer createStreamRecordSerializer(StreamSchema
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485180#comment-15485180 ] ASF GitHub Bot commented on FLINK-4520: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78447005 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/operator/AbstractSiddhiOperator.java --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi.operator; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.StreamSchema; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wso2.siddhi.core.ExecutionPlanRuntime; +import org.wso2.siddhi.core.SiddhiManager; +import org.wso2.siddhi.core.stream.input.InputHandler; +import org.wso2.siddhi.query.api.definition.AbstractDefinition; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; + +public abstract class AbstractSiddhiOperatorextends AbstractStreamOperator implements OneInputStreamOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSiddhiOperator.class); + private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11; + + private final SiddhiOperatorContext siddhiPlan; + private final String executionExpression; + private final boolean isProcessingTime; + private final Map streamRecordSerializers; + + private transient SiddhiManager siddhiManager; + private transient ExecutionPlanRuntime siddhiRuntime; + private transient Map inputStreamHandlers; + + // queue to buffer out of order stream records + private transient PriorityQueue priorityQueue; + + /** +* @param siddhiPlan Siddhi CEP Execution Plan +*/ + public AbstractSiddhiOperator(SiddhiOperatorContext siddhiPlan) { + validate(siddhiPlan); + this.executionExpression = siddhiPlan.getFinalExecutionPlan(); + this.siddhiPlan = siddhiPlan; + this.isProcessingTime = this.siddhiPlan.getTimeCharacteristic() == TimeCharacteristic.ProcessingTime; + this.streamRecordSerializers = new HashMap<>(); + + for (String streamId : this.siddhiPlan.getInputStreams()) { --- End diff -- This can be moved into a separate method. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485176#comment-15485176 ] ASF GitHub Bot commented on FLINK-4520: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78446790 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiStream.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.contrib.siddhi.operator.SiddhiOperatorContext; +import org.apache.flink.contrib.siddhi.utils.SiddhiStreamFactory; +import org.apache.flink.contrib.siddhi.utils.SiddhiTypeFactory; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.util.Preconditions; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Siddhi CEP API Interface + */ +@PublicEvolving +public abstract class SiddhiStream { + private final SiddhiCEP environment; + + public SiddhiStream(SiddhiCEP environment) { --- End diff -- Could we name this `cepEnvironment`? > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2.
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485174#comment-15485174 ] ASF GitHub Bot commented on FLINK-4520: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78446628 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { --- End diff -- I think this class would benefit from more JavaDocs. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2")
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485172#comment-15485172 ] ASF GitHub Bot commented on FLINK-4520: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78446582 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final MapdataStreams = new HashMap<>(); + private final Map dataStreamSchemas = new HashMap<>(); + private final Map extensions = new HashMap<>(); + + public Map getDataStreams() { + return this.dataStreams; + } + + public Map getDataStreamSchemas() { + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId) { + return dataStreams.containsKey(streamId); + } + + public Map getExtensions() { + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if (!isStreamDefined(streamId)) { + throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; --- End diff -- The same comment is for other public methods of this class. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485169#comment-15485169 ] ASF GitHub Bot commented on FLINK-4520: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78446460 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final MapdataStreams = new HashMap<>(); + private final Map dataStreamSchemas = new HashMap<>(); + private final Map extensions = new HashMap<>(); + + public Map getDataStreams() { + return this.dataStreams; + } + + public Map getDataStreamSchemas() { + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId) { + return dataStreams.containsKey(streamId); + } + + public Map getExtensions() { + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if (!isStreamDefined(streamId)) { + throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; --- End diff -- I would suggest to use `Preconditions` class to check the input. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15485167#comment-15485167 ] ASF GitHub Bot commented on FLINK-4520: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78446343 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final MapdataStreams = new HashMap<>(); + private final Map dataStreamSchemas = new HashMap<>(); + private final Map extensions = new HashMap<>(); + + public Map getDataStreams() { + return this.dataStreams; + } + + public Map getDataStreamSchemas() { + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId) { + return dataStreams.containsKey(streamId); + } + + public Map getExtensions() { + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if (!isStreamDefined(streamId)) { + throw new UndefinedStreamException("Stream (streamId: " + streamId + ") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { --- End diff -- Could you put constructor after the fields definitions? > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479245#comment-15479245 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2487 @apivovarov thanks very much for the comments. I have formatted all code as required. Pls. kindly help continue reviewing. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479106#comment-15479106 ] ASF GitHub Bot commented on FLINK-4520: --- Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270764 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final MapdataStreams; + private final Map dataStreamSchemas; + private final Map extensions = new HashMap<>(); + + public Map getDataStreams(){ + return this.dataStreams; + } + + public Map getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); + } + + public static SiddhiStream.SingleSiddhiStream define(String streamId, DataStream inStream, String... fieldNames) { + SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment()); + return environment.from(streamId,inStream,fieldNames); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId, DataStream inStream, String... fieldNames){ + this.registerStream(streamId,inStream,fieldNames); + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId){ + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.UnionSiddhiStream union(String firstStreamId,String ... unionStreamIds){ + return new SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds); + } + + public void registerStream(final String streamId, DataStream dataStream, String... fieldNames) { + if (isStreamDefined(streamId)) { + throw new DuplicatedStreamException("Input stream: " + streamId + " already exists"); + } + dataStreams.put(streamId, dataStream); + SiddhiStreamSchema schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames); + schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig())); + dataStreamSchemas.put(streamId, schema); + }
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479103#comment-15479103 ] ASF GitHub Bot commented on FLINK-4520: --- Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270757 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final MapdataStreams; + private final Map dataStreamSchemas; + private final Map extensions = new HashMap<>(); + + public Map getDataStreams(){ + return this.dataStreams; + } + + public Map getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); + } + + public static SiddhiStream.SingleSiddhiStream define(String streamId, DataStream inStream, String... fieldNames) { + SiddhiCEP environment = SiddhiCEP.getSiddhiEnvironment(inStream.getExecutionEnvironment()); + return environment.from(streamId,inStream,fieldNames); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId, DataStream inStream, String... fieldNames){ + this.registerStream(streamId,inStream,fieldNames); + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.SingleSiddhiStream from(String streamId){ + return new SiddhiStream.SingleSiddhiStream<>(streamId, this); + } + + public SiddhiStream.UnionSiddhiStream union(String firstStreamId,String ... unionStreamIds){ + return new SiddhiStream.SingleSiddhiStream(firstStreamId,this).union(unionStreamIds); + } + + public void registerStream(final String streamId, DataStream dataStream, String... fieldNames) { + if (isStreamDefined(streamId)) { + throw new DuplicatedStreamException("Input stream: " + streamId + " already exists"); + } + dataStreams.put(streamId, dataStream); + SiddhiStreamSchema schema = new SiddhiStreamSchema<>(dataStream.getType(), fieldNames); + schema.setTypeSerializer(schema.getTypeInfo().createSerializer(dataStream.getExecutionConfig())); + dataStreamSchemas.put(streamId, schema); + }
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15479102#comment-15479102 ] ASF GitHub Bot commented on FLINK-4520: --- Github user apivovarov commented on a diff in the pull request: https://github.com/apache/flink/pull/2487#discussion_r78270750 --- Diff: flink-contrib/flink-siddhi/src/main/java/org/apache/flink/contrib/siddhi/SiddhiCEP.java --- @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.siddhi; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.contrib.siddhi.exception.DuplicatedStreamException; +import org.apache.flink.contrib.siddhi.exception.UndefinedStreamException; +import org.apache.flink.contrib.siddhi.schema.SiddhiStreamSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Siddhi CEP Execution Environment + */ +@PublicEvolving +public class SiddhiCEP { + private final StreamExecutionEnvironment executionEnvironment; + private final MapdataStreams; + private final Map dataStreamSchemas; + private final Map extensions = new HashMap<>(); + + public Map getDataStreams(){ + return this.dataStreams; + } + + public Map getDataStreamSchemas(){ + return this.dataStreamSchemas; + } + + public boolean isStreamDefined(String streamId){ + return dataStreams.containsKey(streamId); + } + + public Map getExtensions(){ + return this.extensions; + } + + public void checkStreamDefined(String streamId) throws UndefinedStreamException { + if(!isStreamDefined(streamId)){ + throw new UndefinedStreamException("Stream (streamId: "+streamId+") not defined"); + } + } + + public SiddhiCEP(StreamExecutionEnvironment streamExecutionEnvironment) { + this.executionEnvironment = streamExecutionEnvironment; + this.dataStreams = new HashMap<>(); + this.dataStreamSchemas = new HashMap<>(); --- End diff -- lines 64 and 65 can be removed. Add `= new HashMap<>();` to lines 36 and 37 similar as it was done on line 38 > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478991#comment-15478991 ] Hao Chen commented on FLINK-4520: - Hi Flink community, The PR is available now, could anybogy help assign the ticket to me and review the PR? Any response is highly appreciated. Thanks, Hao > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library, patch-available > Fix For: 1.2.0 > > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478961#comment-15478961 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch commented on the issue: https://github.com/apache/flink/pull/2486 Closed this PR for cleaning commit history and recreated another PR #2487 > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478958#comment-15478958 ] ASF GitHub Bot commented on FLINK-4520: --- GitHub user haoch opened a pull request: https://github.com/apache/flink/pull/2487 [FLINK-4520][flink-siddhi] Integrate Siddhi as a light-weight Streaming CEP Library Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed # Abstraction Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. __It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.__ # Features * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like * Filter * Join * Aggregation * Group by * Having * Window * Conditions and Expressions * Pattern processing * Sequence processing * Event Tables ... * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`) * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) # Test Cases * [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase `](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java) # Example StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); DataStream> output = cep .from("inputStream1").union("inputStream2") .sql( "from every s1 = inputStream1[id == 2] " + " -> s2 = inputStream2[id == 3] " + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price" + "insert into outputStream" ) .returns("outputStream"); env.execute(); You can merge this pull request into a Git repository by running: $ git pull https://github.com/haoch/flink FLINK-4520 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2487.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2487 commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0 Author: Hao Chen Date: 2016-08-29T12:34:42Z Implement initial version of flink-siddhi stream operator commit
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15478928#comment-15478928 ] ASF GitHub Bot commented on FLINK-4520: --- Github user haoch closed the pull request at: https://github.com/apache/flink/pull/2486 > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library > > h1. flink-siddhi proposal > h2. Abstraction > Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event > Processing Engine (CEP) released as a Java Library under `Apache Software > License v2.0`. Siddhi CEP processes events which are generated by various > event sources, analyses them and notifies appropriate complex events > according to the user specified queries. > It would be very helpful for flink users (especially streaming application > developer) to provide a library to run Siddhi CEP query directly in Flink > streaming application. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Features > * Integrate Siddhi CEP as an stream operator (i.e. > `TupleStreamSiddhiOperator`), supporting rich CEP features like > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > ... > * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See > `SiddhiCEP` and `SiddhiStream`) > * Register Flink DataStream associating native type information with > Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. > * Connect with single or multiple Flink DataStreams with Siddhi CEP > Execution Plan > * Return output stream as DataStream with type intelligently inferred > from Siddhi Stream Schema > * Integrate siddhi runtime state management with Flink state (See > `AbstractSiddhiOperator`) > * Support siddhi plugin management to extend CEP functions. (See > `SiddhiCEP#registerExtension`) > h2. Test Cases > * org.apache.flink.contrib.siddhi.SiddhiCEPITCase: > https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java > h2. Example > {code} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); > cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); > cep.registerStream("inputStream1", input1, "id", "name", > "price","timestamp"); > cep.registerStream("inputStream2", input2, "id", "name", > "price","timestamp"); > DataStream> output = cep > .from("inputStream1").union("inputStream2") > .sql( > "from every s1 = inputStream1[id == 2] " > + " -> s2 = inputStream2[id == 3] " > + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as > name_2 , custom:plus(s1.price,s2.price) as price" > + "insert into outputStream" > ) > .returns("outputStream"); > env.execute(); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15477941#comment-15477941 ] ASF GitHub Bot commented on FLINK-4520: --- GitHub user haoch opened a pull request: https://github.com/apache/flink/pull/2486 FLINK-4520 Integrate Siddhi as a light-weight Streaming CEP Library Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed # Abstraction Siddhi CEP is a lightweight and easy-to-use Open Source Complex Event Processing Engine (CEP) released as a Java Library under `Apache Software License v2.0`. Siddhi CEP processes events which are generated by various event sources, analyses them and notifies appropriate complex events according to the user specified queries. __It would be very helpful for flink users (especially streaming application developer) to provide a library to run Siddhi CEP query directly in Flink streaming application.__ # Features * Integrate Siddhi CEP as an stream operator (i.e. `TupleStreamSiddhiOperator`), supporting rich CEP features like * Filter * Join * Aggregation * Group by * Having * Window * Conditions and Expressions * Pattern processing * Sequence processing * Event Tables ... * Provide easy-to-use Siddhi CEP API to integrate Flink DataStream API (See `SiddhiCEP` and `SiddhiStream`) * Register Flink DataStream associating native type information with Siddhi Stream Schema, supporting POJO,Tuple, Primitive Type, etc. * Connect with single or multiple Flink DataStreams with Siddhi CEP Execution Plan * Return output stream as DataStream with type intelligently inferred from Siddhi Stream Schema * Integrate siddhi runtime state management with Flink state (See `AbstractSiddhiOperator`) * Support siddhi plugin management to extend CEP functions. (See `SiddhiCEP#registerExtension`) # Test Cases * [`org.apache.flink.contrib.siddhi. SiddhiCEPITCase `](https://github.com/haoch/flink/blob/FLINK-4520/flink-contrib/flink-siddhi/src/test/java/org/apache/flink/contrib/siddhi/SiddhiCEPITCase.java) # Example StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SiddhiCEP cep = SiddhiCEP.getSiddhiEnvironment(env); cep.registerExtension("custom:plus",CustomPlusFunctionExtension.class); cep.registerStream("inputStream1", input1, "id", "name", "price","timestamp"); cep.registerStream("inputStream2", input2, "id", "name", "price","timestamp"); DataStream> output = cep .from("inputStream1").union("inputStream2") .sql( "from every s1 = inputStream1[id == 2] " + " -> s2 = inputStream2[id == 3] " + "select s1.id as id_1, s1.name as name_1, s2.id as id_2, s2.name as name_2 , custom:plus(s1.price,s2.price) as price" + "insert into outputStream" ) .returns("outputStream"); env.execute(); You can merge this pull request into a Git repository by running: $ git pull https://github.com/haoch/flink FLINK-4520 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2486.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2486 commit 680739e1f1a7d2e12a1d1ba4f1cc1ea8494002e0 Author: Hao Chen Date: 2016-08-29T12:34:42Z Implement initial version of flink-siddhi stream operator commit d8b131d9204e9e359afae80087afe2aecab27eaf
[jira] [Commented] (FLINK-4520) Integrate Siddhi as a lightweight CEP Library
[ https://issues.apache.org/jira/browse/FLINK-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15449216#comment-15449216 ] Hao Chen commented on FLINK-4520: - If there is no more problem, I would like to volunteer to work on this feature. > Integrate Siddhi as a lightweight CEP Library > - > > Key: FLINK-4520 > URL: https://issues.apache.org/jira/browse/FLINK-4520 > Project: Flink > Issue Type: New Feature > Components: CEP >Affects Versions: 1.2.0 >Reporter: Hao Chen > Labels: cep, library > > h1. Flink Siddhi CEP Integration Proposal > h2. About Siddhi CEP > Siddhi CEP is a lightweight, easy-to-use Open Source Complex Event Processing > Engine (CEP) released as a Java Library under Apache Software License v2.0. > Siddhi CEP processes events which are generated by various event sources, > analyses them and notifies appropriate complex events according to the user > specified queries. > * http://wso2.com/products/complex-event-processor/ > * https://github.com/wso2/siddhi > h2. Proposal > As known, siddhi is very lightweight and rich featured CEP library, > supporting most traditional CEP cases like: > * Filter > * Join > * Aggregation > * Group by > * Having > * Window > * Conditions and Expressions > * Pattern processing > * Sequence processing > * Event Tables > * Partitions > * Scripting:Support JavaScript & Scala Scripts within Siddhi Queries > * Query: SQL like query language > The proposal is to > * Embed siddhi CEP as an stream operator of Flink > * Support native siddhi query, extensions to run inside Flink > StreamExecutionEnvironment > * Integrate state management. > * Provide consistent DSL to integrate with Flink programing API. -- This message was sent by Atlassian JIRA (v6.3.4#6332)