[jira] [Closed] (FLINK-12744) ML common parameters
[ https://issues.apache.org/jira/browse/FLINK-12744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-12744. - Resolution: Fixed Fix Version/s: 1.9.0 > ML common parameters > > > Key: FLINK-12744 > URL: https://issues.apache.org/jira/browse/FLINK-12744 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Xu Yang >Assignee: Xu Yang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > We defined some common-used parameters for machine-learning algorithms. > - *add ML common parameters* > - *change behavior when use default constructor of param factory* > - *add shared params in ml package* > - *add flink-ml module* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12744) ML common parameters
[ https://issues.apache.org/jira/browse/FLINK-12744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879343#comment-16879343 ] Shaoxuan Wang commented on FLINK-12744: --- Fixed in master: f18481ee54b61a737ae1426ff4dcaf7006e0edbd > ML common parameters > > > Key: FLINK-12744 > URL: https://issues.apache.org/jira/browse/FLINK-12744 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Xu Yang >Assignee: Xu Yang >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > We defined some common-used parameters for machine-learning algorithms. > - *add ML common parameters* > - *change behavior when use default constructor of param factory* > - *add shared params in ml package* > - *add flink-ml module* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12758) Add flink-ml-lib module
[ https://issues.apache.org/jira/browse/FLINK-12758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-12758. - Resolution: Fixed > Add flink-ml-lib module > --- > > Key: FLINK-12758 > URL: https://issues.apache.org/jira/browse/FLINK-12758 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Luo Gen >Assignee: Luo Gen >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The Jira introduces a new module "flink-ml-lib" under flink-ml-parent. > The flink-ml-lib is planned in the roadmap in FLIP-39, as the code base of > library implementations of FlinkML. This Jira only aims to create the module, > and algorithms will be added in separate Jira in the future. > For more details, please refer to [FLIP39 design > doc|[https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo]] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12758) Add flink-ml-lib module
[ https://issues.apache.org/jira/browse/FLINK-12758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879055#comment-16879055 ] Shaoxuan Wang commented on FLINK-12758: --- Fixed in master: 726d9e49905e893659a1f7b0ba83a0a59bec8fac > Add flink-ml-lib module > --- > > Key: FLINK-12758 > URL: https://issues.apache.org/jira/browse/FLINK-12758 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Luo Gen >Assignee: Luo Gen >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > The Jira introduces a new module "flink-ml-lib" under flink-ml-parent. > The flink-ml-lib is planned in the roadmap in FLIP-39, as the code base of > library implementations of FlinkML. This Jira only aims to create the module, > and algorithms will be added in separate Jira in the future. > For more details, please refer to [FLIP39 design > doc|[https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo]] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12597) Remove the legacy flink-libraries/flink-ml
[ https://issues.apache.org/jira/browse/FLINK-12597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-12597. - Resolution: Fixed Fix Version/s: 1.9.0 > Remove the legacy flink-libraries/flink-ml > -- > > Key: FLINK-12597 > URL: https://issues.apache.org/jira/browse/FLINK-12597 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Affects Versions: 1.9.0 > Reporter: Shaoxuan Wang >Assignee: Luo Gen >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > As discussed in dev-ml, we decided to delete the legacy > flink-libraries/flink-ml, so as to the flink-libraries/flink-ml-uber. There > is not any further development planned for this legacy flink-ml package in > 1.9 or even future. Users could just use the 1.8 version if their > products/projects still rely on this package. > [1] > [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/SURVEY-Usage-of-flink-ml-and-DISCUSS-Delete-flink-ml-td29057.html] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12597) Remove the legacy flink-libraries/flink-ml
[ https://issues.apache.org/jira/browse/FLINK-12597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16879052#comment-16879052 ] Shaoxuan Wang commented on FLINK-12597: --- Fixed in master: 2b2a83df56242aa90ee731f25d17b050b75df0f3 > Remove the legacy flink-libraries/flink-ml > -- > > Key: FLINK-12597 > URL: https://issues.apache.org/jira/browse/FLINK-12597 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Affects Versions: 1.9.0 > Reporter: Shaoxuan Wang >Assignee: Luo Gen >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > As discussed in dev-ml, we decided to delete the legacy > flink-libraries/flink-ml, so as to the flink-libraries/flink-ml-uber. There > is not any further development planned for this legacy flink-ml package in > 1.9 or even future. Users could just use the 1.8 version if their > products/projects still rely on this package. > [1] > [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/SURVEY-Usage-of-flink-ml-and-DISCUSS-Delete-flink-ml-td29057.html] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12881) Add more functionalities for ML Params and ParamInfo class
[ https://issues.apache.org/jira/browse/FLINK-12881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-12881. - Resolution: Fixed Fix Version/s: 1.9.0 > Add more functionalities for ML Params and ParamInfo class > -- > > Key: FLINK-12881 > URL: https://issues.apache.org/jira/browse/FLINK-12881 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Xu Yang >Assignee: Xu Yang >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > 1. change Params$get to support aliases > * support aliases > * if the Params contains the specific parameter and alias, but has more than > one value or > * if the Params doesn't contains the specific parameter, while the ParamInfo > is optional but has no default value, it will throw exception > * when isOptional is true, contain(ParamInfo) is true, it will return the > value found in Params, whether the value is null or not. when isOptional is > true, contain(ParamInfo) is false: hasDefaultValue is true, it will return > defaultValue. hasDefaultValue is false, it will throw exception. developer > should use contain to check that Params has ParamInfo or not. > 2. add size, clear, isEmpty, contains, fromJson in Params > 3. fix ParamInfo, PipelineStage to adapt new Params > * assert null in alias > * use Params$loadJson in PipelineStage > 4. add test cases about aliases -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12881) Add more functionalities for ML Params and ParamInfo class
[ https://issues.apache.org/jira/browse/FLINK-12881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16875719#comment-16875719 ] Shaoxuan Wang commented on FLINK-12881: --- Fixed in master:1660c6b47af789fa5c9bf6a3ff77e868ca90 > Add more functionalities for ML Params and ParamInfo class > -- > > Key: FLINK-12881 > URL: https://issues.apache.org/jira/browse/FLINK-12881 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Xu Yang >Assignee: Xu Yang >Priority: Major > Labels: pull-request-available > Time Spent: 20m > Remaining Estimate: 0h > > 1. change Params$get to support aliases > * support aliases > * if the Params contains the specific parameter and alias, but has more than > one value or > * if the Params doesn't contains the specific parameter, while the ParamInfo > is optional but has no default value, it will throw exception > * when isOptional is true, contain(ParamInfo) is true, it will return the > value found in Params, whether the value is null or not. when isOptional is > true, contain(ParamInfo) is false: hasDefaultValue is true, it will return > defaultValue. hasDefaultValue is false, it will throw exception. developer > should use contain to check that Params has ParamInfo or not. > 2. add size, clear, isEmpty, contains, fromJson in Params > 3. fix ParamInfo, PipelineStage to adapt new Params > * assert null in alias > * use Params$loadJson in PipelineStage > 4. add test cases about aliases -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (FLINK-12470) FLIP39: Flink ML pipeline and ML libs
[ https://issues.apache.org/jira/browse/FLINK-12470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang updated FLINK-12470: -- Comment: was deleted (was: Fixed in master:1660c6b47af789fa5c9bf6a3ff77e868ca90) > FLIP39: Flink ML pipeline and ML libs > - > > Key: FLINK-12470 > URL: https://issues.apache.org/jira/browse/FLINK-12470 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Affects Versions: 1.9.0 > Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang >Priority: Major > Fix For: 1.9.0 > > Original Estimate: 720h > Remaining Estimate: 720h > > This is the umbrella Jira for FLIP39, which intents to to enhance the > scalability and the ease of use of Flink ML. > ML Discussion thread: > [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-39-Flink-ML-pipeline-and-ML-libs-td28633.html] > Google Doc: (will convert it to an official confluence page very soon ) > [https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo|https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo/edit] > In machine learning, there are mainly two types of people. The first type is > MLlib developer. They need a set of standard/well abstracted core ML APIs to > implement the algorithms. Every ML algorithm is a certain concrete > implementation on top of these APIs. The second type is MLlib users who > utilize the existing/packaged MLlib to train or server a model. It is pretty > common that the entire training or inference is constructed by a sequence of > transformation or algorithms. It is essential to provide a workflow/pipeline > API for MLlib users such that they can easily combine multiple algorithms to > describe the ML workflow/pipeline. > Current Flink has a set of ML core inferences, but they are built on top of > dataset API. This does not quite align with the latest flink > [roadmap|https://flink.apache.org/roadmap.html] (TableAPI will become the > first class citizen and primary API for analytics use cases, while dataset > API will be gradually deprecated). Moreover, Flink at present does not have > any interface that allows MLlib users to describe an ML workflow/pipeline, > nor provides any approach to persist pipeline or model and reuse them in the > future. To solve/improve these issues, in this FLIP we propose to: > * Provide a new set of ML core interface (on top of Flink TableAPI) > * Provide a ML pipeline interface (on top of Flink TableAPI) > * Provide the interfaces for parameters management and pipeline persistence > * All the above interfaces should facilitate any new ML algorithm. We will > gradually add various standard ML algorithms on top of these new proposed > interfaces to ensure their feasibility and scalability. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12470) FLIP39: Flink ML pipeline and ML libs
[ https://issues.apache.org/jira/browse/FLINK-12470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16875717#comment-16875717 ] Shaoxuan Wang commented on FLINK-12470: --- Fixed in master:1660c6b47af789fa5c9bf6a3ff77e868ca90 > FLIP39: Flink ML pipeline and ML libs > - > > Key: FLINK-12470 > URL: https://issues.apache.org/jira/browse/FLINK-12470 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Affects Versions: 1.9.0 > Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang >Priority: Major > Fix For: 1.9.0 > > Original Estimate: 720h > Remaining Estimate: 720h > > This is the umbrella Jira for FLIP39, which intents to to enhance the > scalability and the ease of use of Flink ML. > ML Discussion thread: > [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-39-Flink-ML-pipeline-and-ML-libs-td28633.html] > Google Doc: (will convert it to an official confluence page very soon ) > [https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo|https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo/edit] > In machine learning, there are mainly two types of people. The first type is > MLlib developer. They need a set of standard/well abstracted core ML APIs to > implement the algorithms. Every ML algorithm is a certain concrete > implementation on top of these APIs. The second type is MLlib users who > utilize the existing/packaged MLlib to train or server a model. It is pretty > common that the entire training or inference is constructed by a sequence of > transformation or algorithms. It is essential to provide a workflow/pipeline > API for MLlib users such that they can easily combine multiple algorithms to > describe the ML workflow/pipeline. > Current Flink has a set of ML core inferences, but they are built on top of > dataset API. This does not quite align with the latest flink > [roadmap|https://flink.apache.org/roadmap.html] (TableAPI will become the > first class citizen and primary API for analytics use cases, while dataset > API will be gradually deprecated). Moreover, Flink at present does not have > any interface that allows MLlib users to describe an ML workflow/pipeline, > nor provides any approach to persist pipeline or model and reuse them in the > future. To solve/improve these issues, in this FLIP we propose to: > * Provide a new set of ML core interface (on top of Flink TableAPI) > * Provide a ML pipeline interface (on top of Flink TableAPI) > * Provide the interfaces for parameters management and pipeline persistence > * All the above interfaces should facilitate any new ML algorithm. We will > gradually add various standard ML algorithms on top of these new proposed > interfaces to ensure their feasibility and scalability. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12473) Add the interface of ML pipeline and ML lib
[ https://issues.apache.org/jira/browse/FLINK-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-12473. - Resolution: Fixed Fix Version/s: 1.9.0 Fixed in master:305095743ffe0bc39f76c1bda983da7d0df9e003 > Add the interface of ML pipeline and ML lib > --- > > Key: FLINK-12473 > URL: https://issues.apache.org/jira/browse/FLINK-12473 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning > Reporter: Shaoxuan Wang >Assignee: Luo Gen >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Attachments: image-2019-05-10-12-50-15-869.png > > Original Estimate: 168h > Time Spent: 0.5h > Remaining Estimate: 167.5h > > This Jira will introduce the major interfaces for ML pipeline and ML lib. > The major interfaces and their relationship diagram is shown as below. For > more details, please refer to [FLIP39 design > doc|[https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo]] > > !image-2019-05-10-12-50-15-869.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12597) Remove the legacy flink-libraries/flink-ml
Shaoxuan Wang created FLINK-12597: - Summary: Remove the legacy flink-libraries/flink-ml Key: FLINK-12597 URL: https://issues.apache.org/jira/browse/FLINK-12597 Project: Flink Issue Type: Sub-task Components: Library / Machine Learning Affects Versions: 1.9.0 Reporter: Shaoxuan Wang Assignee: Luo Gen As discussed in dev-ml, we decided to delete the legacy flink-libraries/flink-ml, so as to the flink-libraries/flink-ml-uber. There is not any further development planned for this legacy flink-ml package in 1.9 or even future. Users could just use the 1.8 version if their products/projects still rely on this package. [1] [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/SURVEY-Usage-of-flink-ml-and-DISCUSS-Delete-flink-ml-td29057.html] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-12473) Add the interface of ML pipeline and ML lib
[ https://issues.apache.org/jira/browse/FLINK-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang reassigned FLINK-12473: - Assignee: Luo Gen > Add the interface of ML pipeline and ML lib > --- > > Key: FLINK-12473 > URL: https://issues.apache.org/jira/browse/FLINK-12473 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning > Reporter: Shaoxuan Wang >Assignee: Luo Gen >Priority: Major > Labels: pull-request-available > Attachments: image-2019-05-10-12-50-15-869.png > > Original Estimate: 168h > Time Spent: 10m > Remaining Estimate: 167h 50m > > This Jira will introduce the major interfaces for ML pipeline and ML lib. > The major interfaces and their relationship diagram is shown as below. For > more details, please refer to [FLIP39 design > doc|[https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo]] > > !image-2019-05-10-12-50-15-869.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-12473) Add the interface of ML pipeline and ML lib
[ https://issues.apache.org/jira/browse/FLINK-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang updated FLINK-12473: -- Summary: Add the interface of ML pipeline and ML lib (was: Add ML pipeline and ML lib Core API) > Add the interface of ML pipeline and ML lib > --- > > Key: FLINK-12473 > URL: https://issues.apache.org/jira/browse/FLINK-12473 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning > Reporter: Shaoxuan Wang >Priority: Major > Attachments: image-2019-05-10-12-50-15-869.png > > Original Estimate: 168h > Remaining Estimate: 168h > > This Jira will introduce the major interfaces for ML pipeline and ML lib. > The major interfaces and their relationship diagram is shown as below. For > more details, please refer to [FLIP39 design > doc|[https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo]] > > !image-2019-05-10-12-50-15-869.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12473) Add ML pipeline and ML lib Core API
Shaoxuan Wang created FLINK-12473: - Summary: Add ML pipeline and ML lib Core API Key: FLINK-12473 URL: https://issues.apache.org/jira/browse/FLINK-12473 Project: Flink Issue Type: Sub-task Components: Library / Machine Learning Reporter: Shaoxuan Wang Attachments: image-2019-05-10-12-50-15-869.png This Jira will introduce the major interfaces for ML pipeline and ML lib. The major interfaces and their relationship diagram is shown as below. For more details, please refer to [FLIP39 design doc|[https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo]] !image-2019-05-10-12-50-15-869.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11115) Port some flink.ml algorithms to table based
[ https://issues.apache.org/jira/browse/FLINK-5?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-5. - Resolution: Duplicate We decided to move the entire ml pipeline and ml lib development under the umbrella Jira (FLINK-12470) of FLIP39 > Port some flink.ml algorithms to table based > > > Key: FLINK-5 > URL: https://issues.apache.org/jira/browse/FLINK-5 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Weihua Jiang >Priority: Major > > This sub-task is to port some flink.ml algorithms to table based to verify > the correctness of design and implementation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11114) Support wrapping inference pipeline as a UDF function in SQL
[ https://issues.apache.org/jira/browse/FLINK-4?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-4. - Resolution: Duplicate We decided to move the entire ml pipeline and ml lib development under the umbrella Jira (FLINK-12470) of FLIP39 > Support wrapping inference pipeline as a UDF function in SQL > > > Key: FLINK-4 > URL: https://issues.apache.org/jira/browse/FLINK-4 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Weihua Jiang >Priority: Major > > Though the standard ML pipeline inference usage is table based (that is, user > at client construct the DAG only using table API), it is also desirable to > wrap the inference logic as a UDF to be used in SQL. This means it shall be > record based. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11113) Support periodically update models when inferencing
[ https://issues.apache.org/jira/browse/FLINK-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-3. - Resolution: Duplicate We decided to move the entire ml pipeline and ml lib development under the umbrella Jira (FLINK-12470) of FLIP39 > Support periodically update models when inferencing > --- > > Key: FLINK-3 > URL: https://issues.apache.org/jira/browse/FLINK-3 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Weihua Jiang >Priority: Major > > As models will be periodically updated and the inference job may running on > stream and will NOT finish, it is important to have this inference job > periodically reload the latest model for inference without start/stop the > inference job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11111) Create a new set of parameters
[ https://issues.apache.org/jira/browse/FLINK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-1. - Resolution: Duplicate We decided to move the entire ml pipeline and ml lib development under the umbrella Jira (FLINK-12470) of FLIP39 > Create a new set of parameters > -- > > Key: FLINK-1 > URL: https://issues.apache.org/jira/browse/FLINK-1 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Weihua Jiang >Priority: Major > > One goal of new Table based ML Pipeline is easy for tooling. That is, for any > ML/AI algorithm adapt to this ML Pipeline standard shall declare all its > parameters via a well-defined interface. So that, the AI platform can > uniformly get/set corresponding parameters while agnostic about the details > of specific algorithm. The only difference between algorithms, from a user's > perspective, is its name. All the other algorithm parameters are self > descriptive. > > This will also be useful for future Flink ML SQL as the SQL parser can > uniformly handle all these parameter things. This can greatly simplify the > SQL parser. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11112) Support pipeline import/export
[ https://issues.apache.org/jira/browse/FLINK-2?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-2. - Resolution: Duplicate We decided to move the entire ml pipeline and ml lib development under the umbrella Jira (FLINK-12470) of FLIP39 > Support pipeline import/export > -- > > Key: FLINK-2 > URL: https://issues.apache.org/jira/browse/FLINK-2 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Weihua Jiang >Priority: Major > > It is quite common to have one job for training and periodical export trained > pipeline and models and another job to load these exported pipeline for > inference. > > Thus, we will need functionalities for pipeline import/export. This shall > work in both streaming/batch environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11109) Create Table based optimizers
[ https://issues.apache.org/jira/browse/FLINK-11109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-11109. - Resolution: Duplicate We decided to move the entire ml pipeline and ml lib development under the umbrella Jira (FLINK-12470) of FLIP39 > Create Table based optimizers > - > > Key: FLINK-11109 > URL: https://issues.apache.org/jira/browse/FLINK-11109 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Weihua Jiang >Priority: Major > > The existing optimizers in org.apache.flink.ml package are dataset based. > This task is to create a new set of optimizers which are table based. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11108) Create a new set of table based ML Pipeline classes
[ https://issues.apache.org/jira/browse/FLINK-11108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-11108. - Resolution: Duplicate We decided to move the entire ml pipeline and ml lib development under the umbrella Jira (FLINK-12470) of FLIP39 > Create a new set of table based ML Pipeline classes > --- > > Key: FLINK-11108 > URL: https://issues.apache.org/jira/browse/FLINK-11108 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Weihua Jiang >Priority: Major > > The main classes are: > # PipelineStage (the trait for each pipeline stage) > # Estimator (training stage) > # Transformer (the inference/feature engineering stage) > # Pipeline (the whole pipeline) > # Predictor (extends Estimator, for supervised learning) > Detailed design can be referred at parent JIRA's design document. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11096) Create a new table based flink ML package
[ https://issues.apache.org/jira/browse/FLINK-11096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-11096. - Resolution: Duplicate We decided to move the entire ml pipeline and ml lib development under the umbrella Jira (FLINK-12470) of FLIP39 > Create a new table based flink ML package > - > > Key: FLINK-11096 > URL: https://issues.apache.org/jira/browse/FLINK-11096 > Project: Flink > Issue Type: Sub-task > Components: Library / Machine Learning >Reporter: Weihua Jiang >Priority: Major > > Currently, the DataSet based ML library is under org.apache._flink.ml_ scala > package and under _flink-libraries/flink-ml directory._ > > There are two questions related to packaging: > # Shall we create a new scala/java package, e.g. org.apache.flink.table.ml? > Or still stay in org.apache.flink.ml? > # Shall we still put new code in flink-libraries/flink-ml directory or > create a new one, e.g. flink-libraries/flink-table-ml and corresponding maven > package? > > I implemented a prototype for the design and found that the new design is > very hard to fit into existing flink.ml codebase. The existing flink.ml code > is tightly coupled with DataSet API. Thus, I have to rewrite almost all parts > of flink.ml to get some sample case to work. The only reusable code from > flink.ml are the base math classes under _org.apache.flink.ml.math_ and > _org.apache.flink.ml.metrics.distance_ packages. > Considering this fact, I will prefer to create a new package > org.apache.flink.table.ml and a new maven package flink-table-ml. > > Please feel free to give your feedbacks. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11095) Table based ML Pipeline
[ https://issues.apache.org/jira/browse/FLINK-11095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836837#comment-16836837 ] Shaoxuan Wang edited comment on FLINK-11095 at 5/10/19 2:48 AM: We decided to move the entire ml pipeline and ml lib development under the umbrella Jira ([FLINK-12470|https://issues.apache.org/jira/browse/FLINK-12470]) of FLIP39 was (Author: shaoxuanwang): We decided to move the entire ml pipeline and ml lib development under FLIP39 > Table based ML Pipeline > --- > > Key: FLINK-11095 > URL: https://issues.apache.org/jira/browse/FLINK-11095 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Weihua Jiang >Priority: Major > > As Table API will be the unified high level API for both batch and streaming, > it is desired to have a table API based ML Pipeline definition. > This new table based ML Pipeline shall: > # support unified batch/stream ML/AI functionalities (train/inference). > # seamless integrated with flink Table based ecosystem. > # provide a base for further flink based AI platform/tooling support. > # provide a base for further flink ML SQL integration. > The initial design is here > [https://docs.google.com/document/d/1PLddLEMP_wn4xHwi6069f3vZL7LzkaP0MN9nAB63X90/edit?usp=sharing.] > And based on this design, I made some initial implementation/prototyping. I > will share the code later. > This is the umbrella JIRA. I will create corresponding sub-jira for each > sub-task. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11095) Table based ML Pipeline
[ https://issues.apache.org/jira/browse/FLINK-11095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-11095. - Resolution: Fixed We decided to move the entire ml pipeline and ml lib development under FLIP39 > Table based ML Pipeline > --- > > Key: FLINK-11095 > URL: https://issues.apache.org/jira/browse/FLINK-11095 > Project: Flink > Issue Type: New Feature > Components: Library / Machine Learning >Reporter: Weihua Jiang >Priority: Major > > As Table API will be the unified high level API for both batch and streaming, > it is desired to have a table API based ML Pipeline definition. > This new table based ML Pipeline shall: > # support unified batch/stream ML/AI functionalities (train/inference). > # seamless integrated with flink Table based ecosystem. > # provide a base for further flink based AI platform/tooling support. > # provide a base for further flink ML SQL integration. > The initial design is here > [https://docs.google.com/document/d/1PLddLEMP_wn4xHwi6069f3vZL7LzkaP0MN9nAB63X90/edit?usp=sharing.] > And based on this design, I made some initial implementation/prototyping. I > will share the code later. > This is the umbrella JIRA. I will create corresponding sub-jira for each > sub-task. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12470) FLIP39: Flink ML pipeline and ML libs
Shaoxuan Wang created FLINK-12470: - Summary: FLIP39: Flink ML pipeline and ML libs Key: FLINK-12470 URL: https://issues.apache.org/jira/browse/FLINK-12470 Project: Flink Issue Type: New Feature Components: Library / Machine Learning Affects Versions: 1.9.0 Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang Fix For: 1.9.0 This is the umbrella Jira for FLIP39, which intents to to enhance the scalability and the ease of use of Flink ML. ML Discussion thread: [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-39-Flink-ML-pipeline-and-ML-libs-td28633.html] Google Doc: (will convert it to an official confluence page very soon ) [https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo|https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo/edit] In machine learning, there are mainly two types of people. The first type is MLlib developer. They need a set of standard/well abstracted core ML APIs to implement the algorithms. Every ML algorithm is a certain concrete implementation on top of these APIs. The second type is MLlib users who utilize the existing/packaged MLlib to train or server a model. It is pretty common that the entire training or inference is constructed by a sequence of transformation or algorithms. It is essential to provide a workflow/pipeline API for MLlib users such that they can easily combine multiple algorithms to describe the ML workflow/pipeline. Current Flink has a set of ML core inferences, but they are built on top of dataset API. This does not quite align with the latest flink [roadmap|https://flink.apache.org/roadmap.html] (TableAPI will become the first class citizen and primary API for analytics use cases, while dataset API will be gradually deprecated). Moreover, Flink at present does not have any interface that allows MLlib users to describe an ML workflow/pipeline, nor provides any approach to persist pipeline or model and reuse them in the future. To solve/improve these issues, in this FLIP we propose to: * Provide a new set of ML core interface (on top of Flink TableAPI) * Provide a ML pipeline interface (on top of Flink TableAPI) * Provide the interfaces for parameters management and pipeline persistence * All the above interfaces should facilitate any new ML algorithm. We will gradually add various standard ML algorithms on top of these new proposed interfaces to ensure their feasibility and scalability. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11004) Wrong ProcessWindowFunction.process argument in example of Incremental Window Aggregation with ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-11004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-11004. - Resolution: Fixed Fixed and merged into master > Wrong ProcessWindowFunction.process argument in example of Incremental Window > Aggregation with ReduceFunction > - > > Key: FLINK-11004 > URL: https://issues.apache.org/jira/browse/FLINK-11004 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.2, 1.5.5, 1.6.2 >Reporter: Yuanyang Wu >Priority: Major > Labels: pull-request-available > Original Estimate: 5m > Remaining Estimate: 5m > > [https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/windows.html#incremental-window-aggregation-with-reducefunction] > Example use wrong "window" argument in process() > {{JAVA example}} > {{- out.collect(new Tuple2 SensorReading>({color:#d04437}window{color}.getStart(), min));}} > {{+ out.collect(new Tuple2 SensorReading>({color:#14892c}context.window(){color}.getStart(), min));}} > > {{Scala example 2nd argument should be context:Context instead of window}} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #4471: [FLINK-6094] [table] Implement stream-stream proct...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/4471#discussion_r140951770 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala --- @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.FlinkConventions +import org.apache.flink.table.plan.nodes.datastream.DataStreamJoin +import org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoin +import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.join.WindowJoinUtil +import scala.collection.JavaConverters._ + +class DataStreamJoinRule + extends ConverterRule( +classOf[FlinkLogicalJoin], +FlinkConventions.LOGICAL, +FlinkConventions.DATASTREAM, +"DataStreamJoinRule") { + + override def matches(call: RelOptRuleCall): Boolean = { +val join: FlinkLogicalJoin = call.rel(0).asInstanceOf[FlinkLogicalJoin] +val joinInfo = join.analyzeCondition + +val (windowBounds, remainingPreds) = WindowJoinUtil.extractWindowBoundsFromPredicate( + joinInfo.getRemaining(join.getCluster.getRexBuilder), + join.getLeft.getRowType.getFieldCount, + join.getRowType, + join.getCluster.getRexBuilder, + TableConfig.DEFAULT) + +// remaining predicate must not access time attributes +val remainingPredsAccessTime = remainingPreds.isDefined && + WindowJoinUtil.accessesTimeAttribute(remainingPreds.get, join.getRowType) + +// Check that no event-time attributes are in the input. +val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala + .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) + +if (!windowBounds.isDefined && !remainingPredsAccessTime && !rowTimeAttrInOutput) { --- End diff -- @fhueske, we actually agree quite a lot on the concern of infinite size you have raised. The same problem does not only exist in joining, but also in other cases, for example GROUPBY, where the grouping-key and associated state can be unlimited in terms of the size that the state of Flink can not hold them all. IMO, there is not an easy way to completely eliminate this just through the validation of query planner/optimizer, so I think it is not a good idea to only allow the unbounded-joining after a certain operators, like non-windowed aggregation (in fact, as mentioned above, the grouping-key of aggregation may also be infinite, so this does not ensure the finite state for joining operator). On the other hand, I think the finite state can only be ensured by the users by giving some hints/controls. We need instruct users to properly set those control knobs, such that their jobs will not run out of space. One hint we currently have is state ttl. (I think @hequn8128 has already added this for this unbounded joining). Maybe here we can add a check on state ttl to force users set a proper value. What do you think? ---
[jira] [Commented] (FLINK-5905) Add user-defined aggregation functions to documentation.
[ https://issues.apache.org/jira/browse/FLINK-5905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16127439#comment-16127439 ] Shaoxuan Wang commented on FLINK-5905: -- This issue will be solved once FLINK-6751 is merged > Add user-defined aggregation functions to documentation. > > > Key: FLINK-5905 > URL: https://issues.apache.org/jira/browse/FLINK-5905 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4546: [FLINK-6751] [docs] Add missing documentation for ...
GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/4546 [FLINK-6751] [docs] Add missing documentation for User-Defined Functions Page *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change - add missing (TODO) documentation for user-defined functions page ## Brief change log - add missing (TODO) documentation for user-defined functions page - a typo fix in udagg comment ## Verifying this change -the documentation part has been manually verified by build_docs.sh ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F6751-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4546.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4546 commit 6c253dba072fc21f480588b89c92688eb8da597d Author: shaoxuan-wang Date: 2017-08-15T16:00:12Z [FLINK-6751] [docs] Add missing documentation for User-Defined Functions Page --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-7194) Add getResultType and getAccumulatorType to AggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-7194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16092983#comment-16092983 ] Shaoxuan Wang edited comment on FLINK-7194 at 7/19/17 12:15 PM: [~fhueske], I am ok with your proposal for the changes to the {{AggregateFunction}} was (Author: shaoxuanwang): [~fhueske], I am ok with your proposal for the changes to the AggregateFunction > Add getResultType and getAccumulatorType to AggregateFunction > - > > Key: FLINK-7194 > URL: https://issues.apache.org/jira/browse/FLINK-7194 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske > > FLINK-6725 and FLINK-6457 proposed to remove methods with default > implementations such as {{getResultType()}}, {{toString()}}, or > {{requiresOver()}} from the base classes of user-defined methods (UDF, UDTF, > UDAGG) and instead offer them as contract methods which are dynamically > In PR [#3993|https://github.com/apache/flink/pull/3993] I argued that these > methods have a fixed signature (in contrast to the {{eval()}}, > {{accumulate()}} and {{retract()}} methods) and should be kept in the > classes. For users that don't need these methods, this doesn't make a > difference because the methods are not abstract and have a default > implementation. For users that need to override the methods it makes a > difference, because they get IDE and compiler support when overriding them > and the cannot get the signature wrong. > Consequently, I propose to add {{getResultType()}} and > {{getAccumulatorType()}} as methods with default implementation to > {{AggregateFunction}}. This will make the interface of {{AggregateFunction}} > more consistent with {{ScalarFunction}} and {{TableFunction}}. > What do you think [~shaoxuan], [~RuidongLi] and [~jark]? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7194) Add getResultType and getAccumulatorType to AggregateFunction
[ https://issues.apache.org/jira/browse/FLINK-7194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16092983#comment-16092983 ] Shaoxuan Wang commented on FLINK-7194: -- [~fhueske], I am ok with your proposal for the changes to the AggregateFunction > Add getResultType and getAccumulatorType to AggregateFunction > - > > Key: FLINK-7194 > URL: https://issues.apache.org/jira/browse/FLINK-7194 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske > > FLINK-6725 and FLINK-6457 proposed to remove methods with default > implementations such as {{getResultType()}}, {{toString()}}, or > {{requiresOver()}} from the base classes of user-defined methods (UDF, UDTF, > UDAGG) and instead offer them as contract methods which are dynamically > In PR [#3993|https://github.com/apache/flink/pull/3993] I argued that these > methods have a fixed signature (in contrast to the {{eval()}}, > {{accumulate()}} and {{retract()}} methods) and should be kept in the > classes. For users that don't need these methods, this doesn't make a > difference because the methods are not abstract and have a default > implementation. For users that need to override the methods it makes a > difference, because they get IDE and compiler support when overriding them > and the cannot get the signature wrong. > Consequently, I propose to add {{getResultType()}} and > {{getAccumulatorType()}} as methods with default implementation to > {{AggregateFunction}}. This will make the interface of {{AggregateFunction}} > more consistent with {{ScalarFunction}} and {{TableFunction}}. > What do you think [~shaoxuan], [~RuidongLi] and [~jark]? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6751) Table API / SQL Docs: UDFs Page
[ https://issues.apache.org/jira/browse/FLINK-6751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16092970#comment-16092970 ] Shaoxuan Wang commented on FLINK-6751: -- Sorry for being no update. I will work on this in the next 3-4 days. > Table API / SQL Docs: UDFs Page > --- > > Key: FLINK-6751 > URL: https://issues.apache.org/jira/browse/FLINK-6751 > Project: Flink > Issue Type: Task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > > Update and extend the documentation of UDFs in the Table API / SQL: > {{./docs/dev/table/udfs.md}} > Missing sections: > - Registration of UDFs > - UDAGGs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-6725) make requiresOver as a contracted method in udagg
[ https://issues.apache.org/jira/browse/FLINK-6725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-6725. Resolution: Won't Fix > make requiresOver as a contracted method in udagg > - > > Key: FLINK-6725 > URL: https://issues.apache.org/jira/browse/FLINK-6725 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > I realized requiresOver is defined in the udagg interface when I wrote up the > udagg doc. I would like to put requiresOver as a contract method. This makes > the entire udagg interface consistently and clean. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3993: [FLINK-6725][table] make requiresOver as a contracted met...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3993 @fhueske @sunjincheng121 @wuchong, thanks for the valuable inputs. We talked offline and get an agreement that we'd better to put `getAccumulatorType()`, `getResultType()` etc. in `AggregateFunction`. Sorry that I forgot to update this Jira. Let us move this change to [FLINK-7194](https://issues.apache.org/jira/browse/FLINK-7194). I will close this PR and [FLINK-6725](https://issues.apache.org/jira/browse/FLINK-6725). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3993: [FLINK-6725][table] make requiresOver as a contrac...
Github user shaoxuan-wang closed the pull request at: https://github.com/apache/flink/pull/3993 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-5878) Add stream-stream inner/left-out join
[ https://issues.apache.org/jira/browse/FLINK-5878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang updated FLINK-5878: - Summary: Add stream-stream inner/left-out join (was: Add stream-stream inner join on TableAPI) > Add stream-stream inner/left-out join > - > > Key: FLINK-5878 > URL: https://issues.apache.org/jira/browse/FLINK-5878 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > > This task is intended to support stream-stream inner join on tableAPI. A > brief design doc is created: > https://docs.google.com/document/d/10oJDw-P9fImD5tc3Stwr7aGIhvem4l8uB2bjLzK72u4/edit > We propose to use the mapState as the backend state interface for this "join" > operator, so this task requires FLINK-4856. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-5878) Add stream-stream inner/left-out join
[ https://issues.apache.org/jira/browse/FLINK-5878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang reassigned FLINK-5878: Assignee: Hequn Cheng (was: Shaoxuan Wang) > Add stream-stream inner/left-out join > - > > Key: FLINK-5878 > URL: https://issues.apache.org/jira/browse/FLINK-5878 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > This task is intended to support stream-stream inner join on tableAPI. A > brief design doc is created: > https://docs.google.com/document/d/10oJDw-P9fImD5tc3Stwr7aGIhvem4l8uB2bjLzK72u4/edit > We propose to use the mapState as the backend state interface for this "join" > operator, so this task requires FLINK-4856. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7146) FLINK SQLs support DDL
[ https://issues.apache.org/jira/browse/FLINK-7146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083614#comment-16083614 ] Shaoxuan Wang commented on FLINK-7146: -- [~yuemeng], DDL is an important interface (it is a customer interface, once we release it is not easy to revise it.) which we have been working on and polishing for couple of months. We are hesitating to propose something as there are still several design details that are under debate. We have opened FLINK-6962 for the similar purpose, will post a detailed design doc very soon. Thanks. > FLINK SQLs support DDL > -- > > Key: FLINK-7146 > URL: https://issues.apache.org/jira/browse/FLINK-7146 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > For now,Flink SQL can't support DDL, we can only register a table by call > registerTableInternal in TableEnvironment > we should support DDL for sql such as create a table or create function like: > {code} > CREATE TABLE kafka_source ( > id INT, > price INT > ) PROPERTIES ( > category = 'source', > type = 'kafka', > version = '0.9.0.1', > separator = ',', > topic = 'test', > brokers = 'xx:9092', > group_id = 'test' > ); > CREATE TABLE db_sink ( > id INT, > price DOUBLE > ) PROPERTIES ( > category = 'sink', > type = 'mysql', > table_name = 'udaf_test', > url = > 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8', > username = 'ds_dev', > password = 's]k51_(>R' > ); > CREATE TEMPORARY function 'AVGUDAF' AS > 'com.x.server.codegen.aggregate.udaf.avg.IntegerAvgUDAF'; > INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6962) SQL DDL for input and output tables
Shaoxuan Wang created FLINK-6962: Summary: SQL DDL for input and output tables Key: FLINK-6962 URL: https://issues.apache.org/jira/browse/FLINK-6962 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Shaoxuan Wang Assignee: lincoln.lee Fix For: 1.4.0 This Jira adds support to allow user define the DDL for source and sink tables, including the waterMark(on source table) and emit SLA (on result table). The detailed design doc will be attached soon. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6963) User Defined Operator
Shaoxuan Wang created FLINK-6963: Summary: User Defined Operator Key: FLINK-6963 URL: https://issues.apache.org/jira/browse/FLINK-6963 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Shaoxuan Wang Assignee: Jark Wu Fix For: 1.4.0 We are proposing to add an User Defined Operator (UDOP) interface. As oppose to UDF(scalars to scalar)/UDTF(scalar to table)/UDAGG(table to scalar), this UDOP allows user to describe a business logic for a table to table conversion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6961) Enable configurable early-firing rate
Shaoxuan Wang created FLINK-6961: Summary: Enable configurable early-firing rate Key: FLINK-6961 URL: https://issues.apache.org/jira/browse/FLINK-6961 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Shaoxuan Wang There are cases that we need to emit the result earlier to allow user get an observation/sample of the result. Right now, only the unbounded aggregate works in early-firing mode (in the future we will support early firing for all different scenarios, like windowed aggregate, unbounded/windowed join, etc.). But in unbounded aggregate, the result is prepared and emitted for each input. This may not be necessary, as user may not need to get the result so frequent in most cases. We create this Jira to track all the efforts (sub-jira) to enable configurable early-firing rate. It should be noted that the early-firing rate will not be exposed to the user, it will be smartly decided by the query optimizer depending on the SLA(allowed latency) of the final result. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5354) Split up Table API documentation into multiple pages
[ https://issues.apache.org/jira/browse/FLINK-5354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027842#comment-16027842 ] Shaoxuan Wang commented on FLINK-5354: -- [~fhueske], thanks for kicking off this. I would like to work on the page for UDFs, as it seems the major missing part is UDAGG. > Split up Table API documentation into multiple pages > - > > Key: FLINK-5354 > URL: https://issues.apache.org/jira/browse/FLINK-5354 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table API & SQL >Reporter: Timo Walther > > The Table API documentation page is quite large at the moment. We should > split it up into multiple pages: > Here is my suggestion: > - Overview (Datatypes, Config, Registering Tables, Examples) > - TableSources and Sinks > - Table API > - SQL > - Functions -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6751) Table API / SQL Docs: UDFs Page
[ https://issues.apache.org/jira/browse/FLINK-6751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang reassigned FLINK-6751: Assignee: Shaoxuan Wang > Table API / SQL Docs: UDFs Page > --- > > Key: FLINK-6751 > URL: https://issues.apache.org/jira/browse/FLINK-6751 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > > Update and refine {{./docs/dev/table/udfs.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3993: [FLINK-6725][table] make requiresOver as a contracted met...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3993 Thanks for the review @fhueske @sunjincheng121 . Fabian, yes, I do not think this will be a blocker for release. In fact, this is a similar interface clean-up issue as FLINK-6457 that we want to solve. But certainly, we can just merge this to master and include this in the next release. Jincheng, yes, it is one's flavor about how to implement an interface with contracted method. From my point of view: 1. A contract method is usually to be very useful as a flag to indicate the characteristic of an aggregator, and this information will help the optimizer to decide the plan. Such methods are retract/merge/resetAccumulator. 2. I prefer we should just expose the user interface without any default implementation (an interface method with default implementation will anyway not control the user behavior). The user defined interface IMO should purely ask users to provide minimum required methods that can let the UDX work. 3. We will call getAccumulatorType and getResultType also during the compilation to generate the plan I think. If we put requiresOver into the interface, we should also consider to put getAccumulatorType and getResultType into interface as well. Also, I think we may want to provide all the agg that requiresOver as the built-in agg in the future. Thereby, we may not need to expose this interface in the UDAGG at that point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3993: [FLINK-6725][table] make requiresOver as a contrac...
GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3993 [FLINK-6725][table] make requiresOver as a contracted method in udagg Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F6725-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3993.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3993 commit 5117a297c089de36723ac836b85502201c927dba Author: shaoxuan-wang Date: 2017-05-26T02:52:31Z [FLINK-6725][table] make requiresOver as a contracted method in udagg --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6725) make requiresOver as a contracted method in udagg
Shaoxuan Wang created FLINK-6725: Summary: make requiresOver as a contracted method in udagg Key: FLINK-6725 URL: https://issues.apache.org/jira/browse/FLINK-6725 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang I realized requiresOver is defined in the udagg interface when I wrote up the udagg doc. I would like to put requiresOver as a contract method. This makes the entire udagg interface consistently and clean. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6637) Move registerFunction to TableEnvironment
[ https://issues.apache.org/jira/browse/FLINK-6637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang updated FLINK-6637: - Component/s: Table API & SQL > Move registerFunction to TableEnvironment > - > > Key: FLINK-6637 > URL: https://issues.apache.org/jira/browse/FLINK-6637 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > We are trying to unify the stream and batch. This unification should cover > the tableAPI&SQL query as well as the function registration (as part of DDL). > Currently the registerFunction for UDTF and UDAGG are defined in > BatchTableEnvironment and StreamTableEnvironment separately. We should move > registerFunction to TableEnvironment. > The reason that we did not put registerFunction into TableEnvironment for > UDTF and UDAGG is that we need different registerFunction for java and scala > codes, as java needs a special way to generate and pass implicit value of > typeInfo: > {code:xml} > implicit val typeInfo: TypeInformation[T] = TypeExtractor > .createTypeInfo(tf, classOf[TableFunction[_]], tf.getClass, 0) > .asInstanceOf[TypeInformation[T]] > {code} > It seems that we need duplicate TableEnvironment class, one for java and one > for scala. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6637) Move registerFunction to TableEnvironment
Shaoxuan Wang created FLINK-6637: Summary: Move registerFunction to TableEnvironment Key: FLINK-6637 URL: https://issues.apache.org/jira/browse/FLINK-6637 Project: Flink Issue Type: Improvement Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang We are trying to unify the stream and batch. This unification should cover the tableAPI&SQL query as well as the function registration (as part of DDL). Currently the registerFunction for UDTF and UDAGG are defined in BatchTableEnvironment and StreamTableEnvironment separately. We should move registerFunction to TableEnvironment. The reason that we did not put registerFunction into TableEnvironment for UDTF and UDAGG is that we need different registerFunction for java and scala codes, as java needs a special way to generate and pass implicit value of typeInfo: {code:xml} implicit val typeInfo: TypeInformation[T] = TypeExtractor .createTypeInfo(tf, classOf[TableFunction[_]], tf.getClass, 0) .asInstanceOf[TypeInformation[T]] {code} It seems that we need duplicate TableEnvironment class, one for java and one for scala. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3923: [FLINK-6587] [table] Simplification and bug fixing of the...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3923 @twalthr, it seems we have reserved the keyword with the case insensitive. For instance, we cannot use "Sum" and "SUM", as "sum" is reserved. Did we intent to filter the keyword like this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6544) Expose State Backend Interface for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-6544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang updated FLINK-6544: - Summary: Expose State Backend Interface for UDAGG (was: Expose Backend State Interface for UDAGG) > Expose State Backend Interface for UDAGG > > > Key: FLINK-6544 > URL: https://issues.apache.org/jira/browse/FLINK-6544 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Currently UDAGG users can not access state, it's necessary to provide users > with a convenient and efficient way to access the state within the UDAGG. > This is the design doc: > https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26nWscLIOn50c/edit# -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6544) Expose Backend State Interface for UDAGG
[ https://issues.apache.org/jira/browse/FLINK-6544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang reassigned FLINK-6544: Assignee: Kaibo Zhou > Expose Backend State Interface for UDAGG > > > Key: FLINK-6544 > URL: https://issues.apache.org/jira/browse/FLINK-6544 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Kaibo Zhou >Assignee: Kaibo Zhou > > Currently UDAGG users can not access state, it's necessary to provide users > with a convenient and efficient way to access the state within the UDAGG. > This is the design doc: > https://docs.google.com/document/d/1g-wHOuFj5pMaYMJg90kVIO2IHbiQiO26nWscLIOn50c/edit# -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-5433) initiate function of Aggregate does not take effect for DataStream aggregation
[ https://issues.apache.org/jira/browse/FLINK-5433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang closed FLINK-5433. Resolution: Fixed Fixed by UDAGG subtasks FLINK-5564 > initiate function of Aggregate does not take effect for DataStream aggregation > -- > > Key: FLINK-5433 > URL: https://issues.apache.org/jira/browse/FLINK-5433 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > > The initiate function of Aggregate works for dataset aggregation, but does > not work for DataStream aggregation. > For instance, when giving an initial value, say 2, for CountAggregate. The > result of dataset aggregate will take this change into account, but > dataStream aggregate will not. > {code} > class CountAggregate extends Aggregate[Long] { > override def initiate(intermediate: Row): Unit = { > intermediate.setField(countIndex, 2L) > } > } > {code} > The output for dataset test(testWorkingAggregationDataTypes) will result in > .select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count) > expected: [1,1,1,1,1.5,1.5,2] > received: [1,1,1,1,1.5,1.5,4] (the result of last count aggregate is bigger > than expect value by 2, as expected) > But the output for datastream > test(testProcessingTimeSlidingGroupWindowOverCount) will remain the same: > .select('string, 'int.count, 'int.avg) > Expected :List(Hello world,1,3, Hello world,2,3, Hello,1,2, Hello,2,2, Hi,1,1) > Actual :MutableList(Hello world,1,3, Hello world,2,3, Hello,1,2, Hello,2,2, > Hi,1,1) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-5915) Add support for the aggregate on multi fields
[ https://issues.apache.org/jira/browse/FLINK-5915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang resolved FLINK-5915. -- Resolution: Fixed This is completely resolved with FLINK-5906. > Add support for the aggregate on multi fields > - > > Key: FLINK-5915 > URL: https://issues.apache.org/jira/browse/FLINK-5915 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > Fix For: 1.3.0 > > > some UDAGGs have multi-fields as input. For instance, > table > .window(Tumble over 10.minutes on 'rowtime as 'w ) > .groupBy('key, 'w) > .select('key, weightedAvg('value, 'weight)) > This task will add the support for the aggregate on multi fields. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114692346 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.utils + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.sql._ +import org.apache.calcite.sql.`type`._ +import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction +import org.apache.flink.api.common.typeinfo._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.utils.AggSqlFunction.{createOperandTypeChecker, createOperandTypeInference, createReturnTypeInference} +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ + +/** + * Calcite wrapper for user-defined aggregate functions. + * + * @param name function name (used by SQL parser) + * @param aggregateFunction aggregate function to be called + * @param returnType the type information of returned value + * @param typeFactory type factory for converting Flink's between Calcite's types + */ +class AggSqlFunction( +name: String, +aggregateFunction: AggregateFunction[_, _], +returnType: TypeInformation[_], +typeFactory: FlinkTypeFactory) + extends SqlUserDefinedAggFunction( +new SqlIdentifier(name, SqlParserPos.ZERO), +createReturnTypeInference(returnType, typeFactory), +createOperandTypeInference(aggregateFunction, typeFactory), +createOperandTypeChecker(aggregateFunction), +// Do not need to provide a calcite aggregateFunction here. Flink aggregateion function +// will be generated when translating the calcite relnode to flink runtime execution plan +null + ) { + + def getFunction: AggregateFunction[_, _] = aggregateFunction +} + +object AggSqlFunction { + + def apply( + name: String, + aggregateFunction: AggregateFunction[_, _], + returnType: TypeInformation[_], + typeFactory: FlinkTypeFactory): AggSqlFunction = { + +new AggSqlFunction(name, aggregateFunction, returnType, typeFactory) + } + + private[flink] def createOperandTypeInference( + aggregateFunction: AggregateFunction[_, _], + typeFactory: FlinkTypeFactory) + : SqlOperandTypeInference = { +/** + * Operand type inference based on [[AggregateFunction]] given information. + */ +new SqlOperandTypeInference { + override def inferOperandTypes( + callBinding: SqlCallBinding, + returnType: RelDataType, + operandTypes: Array[RelDataType]): Unit = { + +val operandTypeInfo = getOperandTypeInfo(callBinding) + +val foundSignature = getAccumulateMethodSignature(aggregateFunction, operandTypeInfo) + .getOrElse(throw new ValidationException(s"Operand types of could not be inferred.")) + +val inferredTypes = getParameterTypes(aggregateFunction, foundSignature.drop(1)) + .map(typeFactory.createTypeFromTypeInfo) + +for (i <- operandTypes.indices) { + if (i < inferredTypes.length - 1) { +operandTypes(i) = inferredTypes(i) + } else if (null != inferredTypes.last.getComponentType) { --- End diff -- Yes, if the last parameter is a component, say Array[Int]. We want to get the type of Int, not the type of Array. --- If your project is set up for it, you can reply to this email and have your reply appe
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114692276 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.functions.utils + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.sql._ +import org.apache.calcite.sql.`type`._ +import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction +import org.apache.flink.api.common.typeinfo._ +import org.apache.flink.table.api.ValidationException +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.table.functions.utils.AggSqlFunction.{createOperandTypeChecker, createOperandTypeInference, createReturnTypeInference} +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ + +/** + * Calcite wrapper for user-defined aggregate functions. + * + * @param name function name (used by SQL parser) + * @param aggregateFunction aggregate function to be called + * @param returnType the type information of returned value + * @param typeFactory type factory for converting Flink's between Calcite's types + */ +class AggSqlFunction( +name: String, +aggregateFunction: AggregateFunction[_, _], +returnType: TypeInformation[_], +typeFactory: FlinkTypeFactory) + extends SqlUserDefinedAggFunction( +new SqlIdentifier(name, SqlParserPos.ZERO), +createReturnTypeInference(returnType, typeFactory), +createOperandTypeInference(aggregateFunction, typeFactory), +createOperandTypeChecker(aggregateFunction), +// Do not need to provide a calcite aggregateFunction here. Flink aggregateion function +// will be generated when translating the calcite relnode to flink runtime execution plan +null + ) { + + def getFunction: AggregateFunction[_, _] = aggregateFunction +} + +object AggSqlFunction { + + def apply( + name: String, + aggregateFunction: AggregateFunction[_, _], + returnType: TypeInformation[_], + typeFactory: FlinkTypeFactory): AggSqlFunction = { + +new AggSqlFunction(name, aggregateFunction, returnType, typeFactory) + } + + private[flink] def createOperandTypeInference( + aggregateFunction: AggregateFunction[_, _], + typeFactory: FlinkTypeFactory) + : SqlOperandTypeInference = { +/** + * Operand type inference based on [[AggregateFunction]] given information. + */ +new SqlOperandTypeInference { + override def inferOperandTypes( + callBinding: SqlCallBinding, + returnType: RelDataType, + operandTypes: Array[RelDataType]): Unit = { + +val operandTypeInfo = getOperandTypeInfo(callBinding) + +val foundSignature = getAccumulateMethodSignature(aggregateFunction, operandTypeInfo) + .getOrElse(throw new ValidationException(s"Operand types of could not be inferred.")) + +val inferredTypes = getParameterTypes(aggregateFunction, foundSignature.drop(1)) + .map(typeFactory.createTypeFromTypeInfo) + +for (i <- operandTypes.indices) { + if (i < inferredTypes.length - 1) { +operandTypes(i) = inferredTypes(i) + } else if (null != inferredTypes.last.getComponentType) { +// last argument is a collection, the array type +operandTypes(i) = inferredTypes.last.getComponentType + } else { +
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114690845 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala --- @@ -130,3 +142,63 @@ case class Avg(child: Expression) extends Aggregation { new SqlAvgAggFunction(AVG) } } + +case class UDAGGFunctionCall( +aggregateFunction: AggregateFunction[_, _], +args: Seq[Expression]) + extends Aggregation { + + override private[flink] def children: Seq[Expression] = args + + // Override makeCopy method in TreeNode, to produce vargars properly + override def makeCopy(args: Array[AnyRef]): this.type = { +if (args.length < 1) { + throw new TableException("Invalid constructor params") +} +val agg = args.head.asInstanceOf[AggregateFunction[_, _]] +val arg = args.last.asInstanceOf[Seq[Expression]] +new UDAGGFunctionCall(agg, arg).asInstanceOf[this.type] + } + + override def resultType: TypeInformation[_] = TypeExtractor.createTypeInfo( +aggregateFunction, classOf[AggregateFunction[_, _]], aggregateFunction.getClass, 0) + + override def validateInput(): ValidationResult = { +val signature = children.map(_.resultType) +// look for a signature that matches the input types +val foundSignature = getAccumulateMethodSignature(aggregateFunction, signature) +if (foundSignature.isEmpty) { + ValidationFailure(s"Given parameters do not match any signature. \n" + + s"Actual: ${signatureToString(signature)} \n" + + s"Expected: ${signaturesToString(aggregateFunction, "accumulate")}") +} else { + ValidationSuccess +} + } + + override def toString(): String = s"${aggregateFunction.getClass.getSimpleName}($args)" + + override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = { +val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory] +val sqlFunction = AggSqlFunction(name, aggregateFunction, resultType, typeFactory) --- End diff -- I was trying to keep this name consistent with `ScalaSqlFunction` and `TableSqlFunction` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114579772 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/DSetUDAGGITCase.scala --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.batch.table + +import java.math.BigDecimal + +import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv} +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMergeAndReset} +import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase +import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, Types} +import org.apache.flink.table.functions.aggfunctions.CountAggFunction +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.types.Row +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.mockito.Mockito.{mock, when} + +import scala.collection.JavaConverters._ + +/** + * We only test some aggregations until better testing of constructed DataSet + * programs is possible. + */ +@RunWith(classOf[Parameterized]) +class DSetUDAGGITCase(configMode: TableConfigMode) --- End diff -- Sounds good to me. I did not have the UDAGG design across all different aggregation tests, as I feel the current agg tests are a little mess up. It always takes me a while to find the right test cases among all different test files. I put UDAGG test cases into one file which helps me to easily understand what kinds of tests have been covered. I think we need to think about how to reorganize our agg test structure. Considering the short time to freeze feature, let us keep the current structure (I will split the UDAGG into all different agg test files) and massage the tests later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114576600 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -178,4 +178,24 @@ class BatchTableEnvironment( registerTableFunctionInternal[T](name, tf) } + + /** +* Registers an [[AggregateFunction]] under a unique name in the TableEnvironment's catalog. +* Registered functions can be referenced in Table API and SQL queries. +* +* @param name The name under which the function is registered. +* @param f The AggregateFunction to register. +* @tparam T The type of the output value. +* @tparam ACC The type of aggregate accumulator. +*/ + def registerFunction[T, ACC]( + name: String, + f: AggregateFunction[T, ACC]) + : Unit = { +implicit val typeInfo: TypeInformation[T] = TypeExtractor --- End diff -- Thanks @twalthr . If I understand you correctly, you suggest to create a contract method `getResultType` for UDAGG, such that user can provide the result type in case the type extraction fails. Sounds good to me? Can you give some examples that when the type extraction will fail (for instance a Row type?) and why it may fail, such that I can add some test cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114571910 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -327,4 +332,56 @@ object ProjectionTranslator { } } + /** +* Find and replace UDAGG function Call to UDAGGFunctionCall +* +* @param fieldthe expression to check +* @param tableEnv the TableEnvironment +* @return an expression with correct UDAGGFunctionCall type for UDAGG functions +*/ + def replaceUDAGGFunctionCall(field: Expression, tableEnv: TableEnvironment): Expression = { --- End diff -- We will not have the chance to execute LogicalNode#resolveExpressions before get aggNames, projectFields, etc. I actually tried another alternative approach to conduct the replacement in extractAggregationsAndProperties and replaceAggregationsAndProperties (we have to check and handle the UDAGG call carefully in both functions), it works but I do not like that design. It makes the logic of these two methods not completely clean. Also, in over aggregate it will not call extractAggregationsAndProperties and replaceAggregationsAndProperties. So I decide to implement a separate function to handle the UDAGGFunctionCall replacement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3809#discussion_r114498337 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/DStreamUDAGGITCase.scala --- @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.table + +import java.math.BigDecimal + +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream} +import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaExecutionEnv} +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment => ScalaExecutionEnv} +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge, WeightedAvgWithRetract} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.table.DStreamUDAGGITCase.TimestampAndWatermarkWithOffset +import org.apache.flink.table.api.scala.stream.utils.StreamITCase +import org.apache.flink.table.api.{SlidingWindow, TableEnvironment, Types} +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.apache.flink.table.functions.aggfunctions.CountAggFunction +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.mockito.Mockito.{mock, when} + +import scala.collection.mutable + +/** + * We only test some aggregations until better testing of constructed DataStream + * programs is possible. + */ +class DStreamUDAGGITCase + extends StreamingMultipleProgramsTestBase { + + val data = List( +//('long, 'int, 'double, 'float, 'bigdec, 'string) +(1000L, 1, 1d, 1f, new BigDecimal("1"), "Hello"), +(2000L, 2, 2d, 2f, new BigDecimal("2"), "Hello"), +(3000L, 3, 3d, 3f, new BigDecimal("3"), "Hello"), +(5000L, 5, 5d, 5f, new BigDecimal("5"), "Hi"), +(6000L, 6, 6d, 6f, new BigDecimal("6"), "Hi"), +(7000L, 7, 7d, 7f, new BigDecimal("7"), "Hi"), +(8000L, 8, 8d, 8f, new BigDecimal("8"), "Hello"), +(9000L, 9, 9d, 9f, new BigDecimal("9"), "Hello"), +(4000L, 4, 4d, 4f, new BigDecimal("4"), "Hello"), +(1L, 10, 10d, 10f, new BigDecimal("10"), "Hi"), +(11000L, 11, 11d, 11f, new BigDecimal("11"), "Hi"), +(12000L, 12, 12d, 12f, new BigDecimal("12"), "Hi"), +(16000L, 16, 16d, 16f, new BigDecimal("16"), "Hello")) + + @Test + def testUdaggSlidingWindowGroupedAggregate(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() + +val stream = env.fromCollection(data).map(t => (t._1, t._2, t._3, t._4, t._6)) +val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'string) + +val countFun = new CountAggFunction + +val weightAvgFun = new WeightedAvg + +val windowedTable = table + .window(Slide over 4.rows every 2.rows as 'w) + .groupBy('w, 'string) + .select(
[GitHub] flink issue #3809: [FLINK-5906] [table] Add support to register UDAGG in Tab...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3809 Thanks for the review, @fhueske , and very good point regarding to the approach to "replace UDAGGFunctionCall". I have addressed your comments. Please take a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3809: [FLINK-5906] [table] Add support to register UDAGG...
GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3809 [FLINK-5906] [table] Add support to register UDAGG in Table and SQL API Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F5906-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3809.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3809 commit ae2e0ae45f4a4e41a67de6c7feda427b4981c079 Author: shaoxuan-wang Date: 2017-05-02T15:00:51Z [FLINK-5906] [table] Add support to register UDAGG in Table and SQL API --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-5905) Add user-defined aggregation functions to documentation.
[ https://issues.apache.org/jira/browse/FLINK-5905?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang reassigned FLINK-5905: Assignee: Shaoxuan Wang > Add user-defined aggregation functions to documentation. > > > Key: FLINK-5905 > URL: https://issues.apache.org/jira/browse/FLINK-5905 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske > Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5906) Add support to register UDAGG in Table and SQL API
[ https://issues.apache.org/jira/browse/FLINK-5906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang updated FLINK-5906: - Summary: Add support to register UDAGG in Table and SQL API (was: Add support to register UDAGGs in TableEnvironment) > Add support to register UDAGG in Table and SQL API > -- > > Key: FLINK-5906 > URL: https://issues.apache.org/jira/browse/FLINK-5906 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske > Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3768: [FLINK-6368][table] Grouping keys in stream aggregations ...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3768 I ran into the same problem today when adding the new test cases for UDAGG. Thanks for the fix, @xccui --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3762: [FLINK-6361] [table] Refactoring the AggregateFunction in...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3762 Hi @fhueske , it is good to have a dedicated Iterable for pair-merge. Please go ahead. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3762: [FLINK-6361] [table] Refactoring the AggregateFunc...
GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3762 [FLINK-6361] [table] Refactoring the AggregateFunction interface and built-in aggregates This PR includes the following changes: 1) remove Accumulator trait; 2) move accumulate, retract, merge, resetAccumulator, getAccumulatorType methods out of AggregateFunction interface, and allow them to be defined as contracted methods for UDAGG; 3) refactoring the built-in aggregates accordingly. 4) fixed a build warning in flink/table/api/Types.scala (unrelated to FLINK-6361) Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F6361-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3762.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3762 commit b5806b1a3975186d69a76fd369ff77cf06e1e67f Author: shaoxuan-wang Date: 2017-04-24T16:28:37Z [FLINK-6361] [table] Refactoring the AggregateFunction interface and built-in aggregates --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6361) Finalize the AggregateFunction interface and refactoring built-in aggregates
Shaoxuan Wang created FLINK-6361: Summary: Finalize the AggregateFunction interface and refactoring built-in aggregates Key: FLINK-6361 URL: https://issues.apache.org/jira/browse/FLINK-6361 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang We have completed codeGen for all aggregate runtime functions. Now we can finalize the AggregateFunction. This includes 1) remove Accumulator trait; 2) remove accumulate, retract, merge, resetAccumulator, getAccumulatorType methods from interface, and allow them as contracted methods for UDAGG; 3) refactoring the built-in aggregates accordingly. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-5813) code generation for user-defined aggregate functions
[ https://issues.apache.org/jira/browse/FLINK-5813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang resolved FLINK-5813. -- Resolution: Fixed All required issues are solved. > code generation for user-defined aggregate functions > > > Key: FLINK-5813 > URL: https://issues.apache.org/jira/browse/FLINK-5813 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > > The input and return types of the new proposed UDAGG functions are > dynamically given by the users. All these user defined functions have to be > generated via codegen. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3735: [FLINK-6242] [table] Add code generation for DataSet Aggr...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3735 @fhueske , your changes look good to me, I left a few comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6334) Refactoring UDTF interface
[ https://issues.apache.org/jira/browse/FLINK-6334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang updated FLINK-6334: - Description: The current UDTF leverages the table.join(expression) interface, which is not a proper interface in terms of semantics. We would like to refactor this to let UDTF use table.join(table) interface. Very briefly, UDTF's apply method will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as join(Table) (was: UDTF's apply method returns a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as join(Table)) > Refactoring UDTF interface > -- > > Key: FLINK-6334 > URL: https://issues.apache.org/jira/browse/FLINK-6334 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li > > The current UDTF leverages the table.join(expression) interface, which is not > a proper interface in terms of semantics. We would like to refactor this to > let UDTF use table.join(table) interface. Very briefly, UDTF's apply method > will return a Table Type, so Join(UDTF('a, 'b, ...) as 'c) shall be viewed as > join(Table) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3735: [FLINK-6242] [table] Add code generation for DataSet Aggr...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3735 @fhueske thanks for your feedback. Yes, we could keep GeneratedAggregations interface very clean as ``` abstract class GeneratedAggregations extends Function { def setAggregationResults(accumulators: Row, output: Row) def setForwardedFields(input: Row, output: Row) def accumulate(accumulators: Row, input: Row) def retract(accumulators: Row, input: Row) def createAccumulators(): Row def mergeAccumulatorsPair(a: Row, b: Row): Row def resetAccumulator(accumulators: Row) } ``` But I feel it might be not very good to add more parameters into code generate function as caller function will usually have to construct unnecessary empty parameters. I think we can break code generate functions into 2-3 functions (these are just the interface to process code-gen parameters, the fundamental implementation of each function will be shared). Let me prototype the changes, and we can continue the discussions from there. Regarding to your other comments. I did not look into the logic of previous implementations while just focused on the code-gen. I will take a look and optimize them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3735#discussion_r111986331 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function { def setAggregationResults(accumulators: Row, output: Row) /** +* Calculates the results from accumulators, and set the results to the output (with key offset) +* +* @param accumulators the accumulators (saved in a row) which contains the current +* aggregated results +* @param output output results collected in a row +*/ + def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row) --- End diff -- This sounds a very good idea. I actually have thought to merge *WithKeyOffset functions into the existing functions. It works for most functions, but `setAggregationResults` and `setAggregationResults` are a little tricky. For `accumulate` and `setAggregateResults`, they do not need keyOffset, but for `merge`, they need. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3735: [FLINK-6242] [table] Add code generation for DataS...
GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3735 [FLINK-6242] [table] Add code generation for DataSet Aggregates Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F6242-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3735.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3735 commit 16e8aa7400f1b9a9f490522427f269fd01a0f640 Author: shaoxuan-wang Date: 2017-04-18T13:45:49Z [FLINK-6242] [table] Add code generation for DataSet Aggregates --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3694: [FLINK-6240] [table] codeGen dataStream aggregates that u...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3694 Thanks @fhueske , I have addressed your comments, please take a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3694: [FLINK-6240] [table] codeGen dataStream aggregates...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r111813245 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -792,6 +792,20 @@ object AggregateUtil { inputType, needRetraction = false) +val forwardMapping = (0 until inputType.getFieldCount).map(x => (x, x)).toArray --- End diff -- Thanks @fhueske , I have addressed your comments, please take a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3696: [FLINK-6090] [table] Add RetractionRule at the stage of d...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3696 @fhueske, thanks for the review and valuable comments. Yes, we'd better add the attributes (which provide information for deriving ACCMode) inside the DataStreamRel interface, such that the code will be much clean and easy to be understood. Thanks, Shaoxuan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3694: [FLINK-6240] [table] codeGen dataStream aggregates...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r110345902 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala --- @@ -18,69 +18,51 @@ package org.apache.flink.table.runtime.aggregate -import java.util.{ArrayList => JArrayList, List => JList} -import org.apache.flink.api.common.functions.{AggregateFunction => DataStreamAggFunc} -import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.functions.AggregateFunction +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.types.Row +import org.slf4j.LoggerFactory /** * Aggregate Function used for the aggregate operator in * [[org.apache.flink.streaming.api.datastream.WindowedStream]] * - * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] - * used for this aggregation - * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param genAggregations Generated aggregate helper function */ class AggregateAggFunction( -private val aggregates: Array[AggregateFunction[_]], -private val aggFields: Array[Array[Int]]) - extends DataStreamAggFunc[Row, Row, Row] { +genAggregations: GeneratedAggregationsFunction) + extends AggregateFunction[Row, Row, Row] --- End diff -- I have thought about this. But unfortunately, the aggregate in WindowedStream does not support richFunction: ``` public SingleOutputStreamOperator aggregate( ... if (aggregateFunction instanceof RichFunction) { throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction."); } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3694: [FLINK-6240] [table] codeGen dataStream aggregates...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3694#discussion_r110344806 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -403,6 +403,41 @@ class CodeGenerator( | }""".stripMargin } +def generateMergeTwoRows( +accTypes: Array[String], +aggs: Array[String]): String = { + + val sig: String = +j""" + | public org.apache.flink.types.Row mergeTwoRows( + |org.apache.flink.types.Row a, + |org.apache.flink.types.Row b) + |""".stripMargin + val merge: String = { +for (i <- aggs.indices) yield + j""" + |${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i); + |${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField($i); + |java.util.ArrayList<${accTypes(i)}> accumulators$i --- End diff -- I was planning to remove `trait Accumulator` very soon, we cannot reuse the list after that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3694: [FLINK-6240] [table] codeGen dataStream aggregates...
GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3694 [FLINK-6240] [table] codeGen dataStream aggregates that use AggregateAggFunction Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F6240-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3694.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3694 commit 22048039ec620eda536b3290453cb7acd177 Author: shaoxuan-wang Date: 2017-04-07T08:18:14Z [FLINK-6240] [table] codeGen dataStream aggregates that use AggregateAggFunction --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3646: [FLINK-6216] [table] DataStream unbounded groupby ...
Github user shaoxuan-wang closed the pull request at: https://github.com/apache/flink/pull/3646 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3646: [FLINK-6216] [table] DataStream unbounded groupby ...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3646#discussion_r110073336 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala --- @@ -299,27 +299,6 @@ class WindowAggregateTest extends TableTestBase { } @Test - def testGroupWithFloorExpression() = { -val sql = "SELECT COUNT(*) FROM MyTable GROUP BY FLOOR(localTimestamp TO HOUR)" --- End diff -- Yes, @fhueske . Let us merge this to the retraction branch. I agree with you, with FLINK-6011, I think we should now only treat (TUMBLE, HOP, SESSION) as Windows, while FLOOR and others as function (and theirs return values should be grouping keys). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3676: [FLINK-6241] [table] codeGen dataStream aggregates that u...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3676 Thanks @fhueske, these changes look good to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3646: [FLINK-6216] [table] DataStream unbounded groupby aggrega...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3646 @fhueske, thanks for your review. I have addressed all your comments, and rebased the code to the master. Please take a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3676: [FLINK-6241] [table] codeGen dataStream aggregates...
GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3676 [FLINK-6241] [table] codeGen dataStream aggregates that use ProcessFunction Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F6241-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3676.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3676 commit 11d35093714984c4e27742c4105b70e474897707 Author: shaoxuan-wang Date: 2017-04-05T14:34:43Z [FLINK-6241] [table] codeGen dataStream aggregates that use ProcessFunction --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window in strea...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3665 So we finally got those supported by Calcite 1.12ï¼Really excited to see those features supported in flinkSQL. Thanks @haohui. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5564) User Defined Aggregates
[ https://issues.apache.org/jira/browse/FLINK-5564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15952127#comment-15952127 ] Shaoxuan Wang commented on FLINK-5564: -- [~fhueske], implementing the entire codeGen into one PR results in thousands lines of code changes. it is hard to be reviewed. I split FLINK-5813 into three sub-tasks. > User Defined Aggregates > --- > > Key: FLINK-5564 > URL: https://issues.apache.org/jira/browse/FLINK-5564 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > > User-defined aggregates would be a great addition to the Table API / SQL. > The current aggregate interface is not well suited for the external users. > This issue proposes to redesign the aggregate such that we can expose an > better external UDAGG interface to the users. The detailed design proposal > can be found here: > https://docs.google.com/document/d/19JXK8jLIi8IqV9yf7hOs_Oz67yXOypY7Uh5gIOK2r-U/edit > Motivation: > 1. The current aggregate interface is not very concise to the users. One > needs to know the design details of the intermediate Row buffer before > implements an Aggregate. Seven functions are needed even for a simple Count > aggregate. > 2. Another limitation of current aggregate function is that it can only be > applied on one single column. There are many scenarios which require the > aggregate function taking multiple columns as the inputs. > 3. “Retraction” is not considered and covered in the current Aggregate. > 4. It might be very good to have a local/global aggregate query plan > optimization, which is very promising to optimize UDAGG performance in some > scenarios. > Proposed Changes: > 1. Implement an aggregate dataStream API (Done by > [FLINK-5582|https://issues.apache.org/jira/browse/FLINK-5582]) > 2. Update all the existing aggregates to use the new aggregate dataStream API > 3. Provide a better User-Defined Aggregate interface > 4. Add retraction support > 5. Add local/global aggregate -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates
Shaoxuan Wang created FLINK-6242: Summary: codeGen DataSet Goupingwindow Aggregates Key: FLINK-6242 URL: https://issues.apache.org/jira/browse/FLINK-6242 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6241) codeGen dataStream aggregates that use ProcessFunction
Shaoxuan Wang created FLINK-6241: Summary: codeGen dataStream aggregates that use ProcessFunction Key: FLINK-6241 URL: https://issues.apache.org/jira/browse/FLINK-6241 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6240) codeGen dataStream aggregates that use AggregateAggFunction
Shaoxuan Wang created FLINK-6240: Summary: codeGen dataStream aggregates that use AggregateAggFunction Key: FLINK-6240 URL: https://issues.apache.org/jira/browse/FLINK-6240 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3647: [FLINK-5915] [table] forward the entire aggregate ArgList...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3647 Thanks @fhueske, I overlooked a list is not always an arraylist. I change it to scala array, but keep the type as integer, as this is type returned (and not easy to be casted to Int) from aggregateCall.getArgList. I updated PR and also rebased to the master (as I noticed there are a few over aggregates have been recently merged), so it will be easier for you to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3647: [FLINK-5915] [table] forward the entire aggregate ...
GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3647 [FLINK-5915] [table] forward the entire aggregate ArgList to aggregate runtime functions This PR partially solved "FLINK-5915 Add support for the aggregate on multi fields". The roadmap of UDAGG would be 1. codeGen all the runtime aggregate functions; 2. add UDAGG tableAPI interface. I would like to kick this PR off earlier as: a) we can finalize the runtime function while doing the codeGen; b) as more and more (over) aggregates are implemented, it would be good if we can finalize the interface and share library to the stage (as we planned) as earlier as possible. Note that: though the entire aggregate ArgList is forwarded to the runtime function, for the functions that have not been codeGened, we will still support only one column aggregate input. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink FLINK5915-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3647.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3647 commit 128479c24003e8f0bd67542f336f9ae20dade352 Author: shaoxuan-wang Date: 2017-03-29T20:41:38Z [FLINK-5915] [table] forward the entire aggregate ArgList to aggregate runtime functions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3646: [FLINK-6216] [table] DataStream unbounded groupby ...
GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3646 [FLINK-6216] [table] DataStream unbounded groupby aggregate with early firing 1. Implemented an unbounded groupby aggregate with early firing (period is 1, emit per every record) 2. Refactored the DataStreamAggregate to DataStreamGroupWindowAggregate Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F6216-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3646.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3646 commit 1c634b9f72f23a1e1d9ce973391e8cb9232a1950 Author: shaoxuan-wang Date: 2017-03-29T19:57:58Z [FLINK-6216] [table] DataStream unbounded groupby aggregate with early firing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6216) DataStream unbounded groupby aggregate with early firing
Shaoxuan Wang created FLINK-6216: Summary: DataStream unbounded groupby aggregate with early firing Key: FLINK-6216 URL: https://issues.apache.org/jira/browse/FLINK-6216 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Shaoxuan Wang Assignee: Shaoxuan Wang Groupby aggregate results in a replace table. For infinite groupby aggregate, we need a mechanism to define when the data should be emitted (early-fired). This task is aimed to implement the initial version of unbounded groupby aggregate, where we update and emit aggregate value per each arrived record. In the future, we will implement the mechanism and interface to let user define the frequency/period of early-firing the unbounded groupby aggregation results. The limit space of backend state is one of major obstacles for supporting unbounded groupby aggregate in practical. Due to this reason, we suggest two common (and very useful) use-cases of this unbounded groupby aggregate: 1. The range of grouping key is limit. In this case, a new arrival record will either insert to state as new record or replace the existing record in the backend state. The data in the backend state will not be evicted if the resource is properly provisioned by the user, such that we can provision the correctness on aggregation results. 2. When the grouping key is unlimited, we will not be able ensure the 100% correctness of "unbounded groupby aggregate". In this case, we will reply on the TTL mechanism of the RocksDB backend state to evicted old data such that we can provision the correct results in a certain time range. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6091) Implement and turn on the retraction for grouping window aggregate
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang reassigned FLINK-6091: Assignee: Hequn Cheng (was: Shaoxuan Wang) > Implement and turn on the retraction for grouping window aggregate > -- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement the functions for processing retract message for grouping window > aggregate. No retract generating function needed as for now, as the current > grouping window aggregates are all executed at “without early firing mode”. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6090) Implement optimizer for retraction and turn on retraction for over window aggregate
[ https://issues.apache.org/jira/browse/FLINK-6090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaoxuan Wang reassigned FLINK-6090: Assignee: Hequn Cheng (was: Shaoxuan Wang) > Implement optimizer for retraction and turn on retraction for over window > aggregate > --- > > Key: FLINK-6090 > URL: https://issues.apache.org/jira/browse/FLINK-6090 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement optimizer for retraction and turn on the retraction for over window > as the first prototype example: > 1.Add RetractionRule at the stage of decoration,which can derive the > replace table/append table, NeedRetraction property. > 2. Match the NeedRetraction and replace table, mark the accumulating mode; > Add the necessary retract generate function at the replace table, and add the > retract process logic at the retract consumer > 3. turn on retraction for over window aggregate -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3585: [FLINK-5990][table]Add event time OVER ROWS BETWEE...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107649062 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +397,24 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + +} + +object SqlITCase { + + class TimestampWithLatenessWatermark(allowedLateness: Long) --- End diff -- @sunjincheng121 can you please consider changing the allowedLateness to offset: `class AssignerWithWatermarkOffset(watermarkOffset: Long)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3564: [FLINK-6089] [table] Implement decoration phase fo...
Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3564#discussion_r107594707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -162,6 +162,27 @@ abstract class TableEnvironment(val config: TableConfig) { } /** +* Returns the decoration rule set for this environment +* including a custom RuleSet configuration. --- End diff -- Do not unnecessarily break lines. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3579: [FLINK-6124] [Table API & SQL] support max/min aggregatio...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3579 @twalthr thanks for the reply and explanation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---