Flink SQL Calcite 解析出错
我的原SQL: CREATE TABLE consumer_session_created ( consumer ROW (consumerUuid STRING), clientIp STRING, deviceId STRING, eventInfo ROW < eventTime BIGINT >, ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime / 1000, '-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector'='kafka' ,'topic'='local.dwh.paylater.consumer.session.consumer-session-created.v1' ,'properties.bootstrap.servers'='http://localhost:9092' ,' properties.group.id'='flink-ato-trusted-consumer' ,'scan.startup.mode'='latest-offset' ,'properties.allow.auto.create.topics'='false' ,'format'='avro-confluent' ,'avro-confluent.basic-auth.credentials-source'='null' ,'avro-confluent.basic-auth.user-info'='null' ,'avro-confluent.schema-registry.url'='http://localhost:8081' ,'avro-confluent.schema-registry.subject'='local.dwh.paylater.consumer.session.consumer-session-created.v1') CREATE TEMPORARY VIEW consumer_session_created_detail as ( SELECT csc.consumer.consumerUuid as consumer_consumerUuid, csc.deviceId as deviceId, csc.clientIp as clientIp, csc.eventInfo.eventTime as eventInfo_eventTime FROM consumer_session_created csc ) SELECT consumer_consumerUuid AS entity_id, COUNT(DISTINCT deviceId) OVER w AS sp_c_distinct_device_cnt_by_consumer_id_h1_0, COUNT (DISTINCT clientIp) OVER w AS sp_d_distinct_ip_cnt_by_consumer_id_h1_0 FROM consumer_session_created_detail WINDOW w AS ( PARTITION BY consumer_consumerUuid ORDER BY eventInfo_eventTime RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW ) 报的错: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 9, column 15 to line 9, column 31: Data Type mismatch between ORDER BY and RANGE clause at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:207) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:715) at aptflinkjobs.stream.SQLStreamer.lambda$execute$1(SQLStreamer.java:149) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at aptflinkjobs.stream.SQLStreamer.execute(SQLStreamer.java:141) at aptflinkjobs.stream.SQLStreamer.main(SQLStreamer.java:296) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
Re: [Question] Why doesn't Flink use Calcite adapter?
Hi guangyuan, The question is an interesting and broad topic. I try to give my opinion based on my limited knowledge. Flink introduces dynamic sources to read from an external system[1]. Flink connector modules are completely decoupled with Calcite. There are two benefits: (1) If users need to develop a custom, user-defined connector, no background knowledge of Calcite is required. (2) Remove unnecessary external dependencies in the Flink connector module. Besides, since Flink is distributed for stateful computations over *unbounded and bounded* data streams, there are more things to be taken into consideration when connected with an external system. For example, how to complete data reading with multiple concurrency, how to store metastore to state in order to recover after failover. I list a few issues as follows. These issues are strongly related to the Flink engine which are not defined in Calcite built-in adapters. (1) Required: define how to read from an external storage system 1.1 scan all rows or lookup rows by one or more keys 1.2 if choose scan mode, define how to split source, how to store metadata to state in order to recover them after recovery from failover. (2) Required: mapping from data type in external system to Flink data type system (3) Optional: for planner optimization, define optionally ability interfaces, e.g SupportsProjectionPushDown/SupportFilterPushDown and so on. (4) Optional: define encoding/ decoding formats Hope it helps. Please correct me if I'm wrong. Best regards, JING ZHANG [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sourcessinks/ Israel Ekpo 于2021年6月28日周一 上午8:28写道: > Maybe this question was better addressed to the DEV list. > > On Fri, Jun 25, 2021 at 11:57 PM guangyuan wang > wrote: > >> >> <https://stackoverflow.com/posts/68108655/timeline> >> >> I have read the design doc of the Flink planner recently. I've found the >> Flink only uses Calcite as an SQL optimizer. It translates an optimized >> RelNode to Flink(or Blink)RelNode, and then transfers it to the physical >> plan. Why doesn't Flink implement Calcite adapters? Isn't this an easier >> way to use calcite? >> >> >> The link of calcite daptor:calcite.apache.org/docs/adapter.html. >> >
Re: [Question] Why doesn't Flink use Calcite adapter?
Maybe this question was better addressed to the DEV list. On Fri, Jun 25, 2021 at 11:57 PM guangyuan wang wrote: > > <https://stackoverflow.com/posts/68108655/timeline> > > I have read the design doc of the Flink planner recently. I've found the > Flink only uses Calcite as an SQL optimizer. It translates an optimized > RelNode to Flink(or Blink)RelNode, and then transfers it to the physical > plan. Why doesn't Flink implement Calcite adapters? Isn't this an easier > way to use calcite? > > > The link of calcite daptor:calcite.apache.org/docs/adapter.html. >
[Question] Why doesn't Flink use Calcite adapter?
<https://stackoverflow.com/posts/68108655/timeline> I have read the design doc of the Flink planner recently. I've found the Flink only uses Calcite as an SQL optimizer. It translates an optimized RelNode to Flink(or Blink)RelNode, and then transfers it to the physical plan. Why doesn't Flink implement Calcite adapters? Isn't this an easier way to use calcite? The link of calcite daptor:calcite.apache.org/docs/adapter.html.
Re: Flink and Calcite
Here is also the original design doc: https://docs.google.com/document/d/1TLayJNOTBle_-m1rQfgA6Ouj1oYsfqRjPcp1h2TVqdI On Wed, Jul 6, 2016 at 4:00 PM, Márton Balassi <balassi.mar...@gmail.com> wrote: > Hey Radu, > > It is in master, you find the related module under > flink-libraries/flink-table in the directory structure. [1] > > [1] > https://github.com/apache/flink/blob/master/flink-libraries/flink-table/pom.xml#L77-L81 > > Best, > > Marton > > On Wed, Jul 6, 2016 at 3:49 PM, Radu Tudoran <radu.tudo...@huawei.com> > wrote: > >> Hi, >> >> >> >> Can someone point me to the repository where the integration of Calcite >> with Flink is available? >> >> Does this come with the master branch (as indicated by the link in the >> blog post)? >> >> https://github.com/apache/flink/tree/master >> >> >> >> Thanks >> >> >> >> >> >> Dr. Radu Tudoran >> >> Research Engineer - Big Data Expert >> >> IT R Division >> >> >> >> [image: cid:image007.jpg@01CD52EB.AD060EE0] >> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> >> European Research Center >> >> Riesstrasse 25, 80992 München >> >> >> >> E-mail: *radu.tudo...@huawei.com <radu.tudo...@huawei.com>* >> >> Mobile: +49 15209084330 >> >> Telephone: +49 891588344173 >> >> >> >> HUAWEI TECHNOLOGIES Duesseldorf GmbH >> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, >> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN >> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, >> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN >> >> This e-mail and its attachments contain confidential information from >> HUAWEI, which is intended only for the person or entity whose address is >> listed above. Any use of the information contained herein in any way >> (including, but not limited to, total or partial disclosure, reproduction, >> or dissemination) by persons other than the intended recipient(s) is >> prohibited. If you receive this e-mail in error, please notify the sender >> by phone or email immediately and delete it! >> >> >> > >
Re: Flink and Calcite
Hey Radu, It is in master, you find the related module under flink-libraries/flink-table in the directory structure. [1] [1] https://github.com/apache/flink/blob/master/flink-libraries/flink-table/pom.xml#L77-L81 Best, Marton On Wed, Jul 6, 2016 at 3:49 PM, Radu Tudoran <radu.tudo...@huawei.com> wrote: > Hi, > > > > Can someone point me to the repository where the integration of Calcite > with Flink is available? > > Does this come with the master branch (as indicated by the link in the > blog post)? > > https://github.com/apache/flink/tree/master > > > > Thanks > > > > > > Dr. Radu Tudoran > > Research Engineer - Big Data Expert > > IT R Division > > > > [image: cid:image007.jpg@01CD52EB.AD060EE0] > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > European Research Center > > Riesstrasse 25, 80992 München > > > > E-mail: *radu.tudo...@huawei.com <radu.tudo...@huawei.com>* > > Mobile: +49 15209084330 > > Telephone: +49 891588344173 > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > > This e-mail and its attachments contain confidential information from > HUAWEI, which is intended only for the person or entity whose address is > listed above. Any use of the information contained herein in any way > (including, but not limited to, total or partial disclosure, reproduction, > or dissemination) by persons other than the intended recipient(s) is > prohibited. If you receive this e-mail in error, please notify the sender > by phone or email immediately and delete it! > > >
Flink and Calcite
Hi, Can someone point me to the repository where the integration of Calcite with Flink is available? Does this come with the master branch (as indicated by the link in the blog post)? https://github.com/apache/flink/tree/master Thanks Dr. Radu Tudoran Research Engineer - Big Data Expert IT R Division [cid:image007.jpg@01CD52EB.AD060EE0] HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail: radu.tudo...@huawei.com Mobile: +49 15209084330 Telephone: +49 891588344173 HUAWEI TECHNOLOGIES Duesseldorf GmbH Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com<http://www.huawei.com/> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!