[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16534400#comment-16534400 ] Chunhui Shi commented on FLINK-7151: Hi [~suez1224], do you have a Jira for your DDL task? > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: yuemeng >Assignee: Shuyi Chen >Priority: Major > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426316#comment-16426316 ] Shuyi Chen commented on FLINK-7151: --- I don't have a concrete timeline, but will try to implement the table DDL before Flink 1.6 release. > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404280#comment-16404280 ] yinhua.dai commented on FLINK-7151: --- [~suez1224] When do you think the feature will be on board? > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16403335#comment-16403335 ] Shuyi Chen commented on FLINK-7151: --- [~yinhua], yes, you should be able to implement your own TableSourceFactory to do so. > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401595#comment-16401595 ] yinhua.dai commented on FLINK-7151: --- That's great, thanks [~suez1224] > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401583#comment-16401583 ] Shuyi Chen commented on FLINK-7151: --- Hi [~yinhua], I am actively working on a design doc for Flink SQL DDL. Initially, I am planning to support : CREATE TABLE DROP TABLE CREATE TYPE DROP TYPE CREATE LIBRARY DROP LIBRARY CREATE FUNCTION DROP FUNCTION I am currently working on the Calcite side ( TYPE DDL, LIBRARY DDL and FUNCTION DDL) to come up with a proper DDL grammar, and add necessary functionality in calcite core to support these DDLs in flink. I'll share the design doc soon when it's ready. > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401548#comment-16401548 ] yinhua.dai commented on FLINK-7151: --- [~yuemeng] Any update on this functionality, I would like to do something similar. > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API SQL >Reporter: yuemeng >Assignee: yuemeng >Priority: Major > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089680#comment-16089680 ] Fabian Hueske commented on FLINK-7151: -- Sure Can you briefly describe how you are planning to address this issue? Right now, all functions are registered as temporary functions. Would this be rather about adding SQL syntax for that or are you going for function support in external catalogs? Thanks, Fabian > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16089346#comment-16089346 ] yuemeng commented on FLINK-7151: [~fhueske] can you assgin this issue to me ,i want to implement this for flink thanks a lot > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16083967#comment-16083967 ] Fabian Hueske commented on FLINK-7151: -- Actually, functions are not registered per job but per {{TableEnvironment}}. So you could run multiple queries on using the same {{TableEnvironment}}. Right now there is no way to save or restore a {{TableEnvironment}}. Registering functions to an external catalog and connecting the catalog to the {{TableEnvironment}} sounds like a good idea. > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16083845#comment-16083845 ] yuemeng commented on FLINK-7151: [~jark] yes,extend the ExternalCatalog to support register external functions is a very good way ,i think,maybe we can register a function use sql ddl instead of api here some questions: 1) all buitin function in flink will be register to shcema each job cycle.it's a ugly way 2) agg function such as sum,avg should matched by calcite,we can directly get the exact function by aggCall instead of hard code > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16083424#comment-16083424 ] Jark Wu commented on FLINK-7151: The idea is very good. But currently, Flink doesn't provide a Catalog (like HCatalog in Hive) which is a storage to manage tables or functions. So all the tables and functions in Flink is job level, not cross jobs (which is "temporary" I think). However, Flink provide {{ExternalCatalog}} API to the connect an external database catalog to Flink's Table API. So I think, maybe you can extend the {{ExternalCatalog}} to support register external functions (which is non-temporary functions). > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16083380#comment-16083380 ] yuemeng commented on FLINK-7151: [~fhueske] thanks for your reply. I will split this issue with two different specific title.thank you [~jark] user can use temporary function to create temporary udf or udaf just like hive for their own job. suppose user want to use a special udf(not buitin function) for their own special job.what we do? maybe we can create a temporary function for this job. {code} Hive: create temporary function mytest as 'test.udf.ToLowerCase'; select mytest(test.name) from test; {code} temporary table is conceptual table,not a physical table,we can use it for streaming etl logical {code} CREATE TABLE kafka_source ( id INT, name VARCHAR(100), price INT, comment VARCHAR(100) ) PROPERTIES ( category = 'source', type = 'kafka', version = '0.9.0.1', separator = ',', topic = 'test', brokers = g:9092', group_id = 'test' ); CREATE TABLE tmp ( id INT, name VARCHAR(100), price INT ) PROPERTIES ( category = 'tmp' ); CREATE TABLE db_sink ( id INT, name VARCHAR(100), price INT ) PROPERTIES ( category = 'sink', type = 'mysql', table_name = 'test', url = 'jdbc:mysql://127.0.0.1:3307/ds?useUnicode=true=UTF8', username = 'ds_dev', password = 's]k51_(>R' ); -- INSERT INTO db_sink SELECT id, name,price FROM kafka_source; --INSERT INTO db_sink SELECT id, name, sum(price) FROM kafka_source GROUP BY id,name HAVING sum(price) > 1 AND id < 10; INSERT INTO tmp SELECT id,name,price + 1.0 FROM kafka_source; INSERT into db_sink SELECT id,name,price + 1 as price FROM tmp; {code} > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16083335#comment-16083335 ] Jark Wu commented on FLINK-7151: Can you explain more detail about what is a temporary function/table? > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: yuemeng > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7151) FLINK SQL support create temporary function and table
[ https://issues.apache.org/jira/browse/FLINK-7151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16082103#comment-16082103 ] Fabian Hueske commented on FLINK-7151: -- Registration of functions and inserting into tables are two unrelated issues. Can you please split this issue? Thank you > FLINK SQL support create temporary function and table > - > > Key: FLINK-7151 > URL: https://issues.apache.org/jira/browse/FLINK-7151 > Project: Flink > Issue Type: New Feature >Reporter: yuemeng > > Based on create temporary function and table.we can register a udf,udaf,udtf > use sql: > {code} > CREATE TEMPORARY function 'TOPK' AS > 'com..aggregate.udaf.distinctUdaf.topk.ITopKUDAF'; > INSERT INTO db_sink SELECT id, TOPK(price, 5, 'DESC') FROM kafka_source GROUP > BY id; > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)