hi,???????? ?????????????????????????? ??????join????????mysql???? 
????????????source????????mysql????????????????????????????




------------------ ???????? ------------------
??????:&nbsp;"wangweigu...@stevegame.cn"<wangweigu...@stevegame.cn&gt;;
????????:&nbsp;2020??6??9??(??????) ????6:35
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;????: ?????? ????flinksql between????




&nbsp; ????1.10???? useBlinkPlanner????????????useOldPlanner????????????
&nbsp; 
??????????????????
&nbsp; Exception in thread "main" org.apache.flink.table.api.TableException: 
Cannot generate a valid execution plan for the given query: 

LogicalProject(num=[$0])
&nbsp; LogicalJoin(condition=[AND(&gt;($0, $1), <($0, $2))], joinType=[inner])
&nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num])
&nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])

This exception indicates that the query uses an unsupported SQL feature.




&nbsp;
???????? ??????
?????????? 2020-06-09 17:41
???????? user-zh
?????? ?????? ????flinksql between????
hi????????????&amp;nbsp;
1 flink1.9.0
2 oldplanner
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-table-api-scala_2.11</artifactId&amp;gt;
<version&amp;gt;1.9.0</version&amp;gt;
</dependency&amp;gt;
&nbsp;
&nbsp;
<dependency&amp;gt;
<groupId&amp;gt;org.apache.flink</groupId&amp;gt;
<artifactId&amp;gt;flink-table-planner_2.11</artifactId&amp;gt;
<version&amp;gt;1.9.0</version&amp;gt;
</dependency&amp;gt;
&nbsp;
3 streaming mode
4. ????????????
&amp;nbsp; &amp;nbsp; val sqlStream = env.createInput(jdbcInput)
&amp;nbsp; &amp;nbsp; 
tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
&amp;nbsp; &amp;nbsp; tnv.registerDataStream("OMstream",value,'ip)
//&amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select * from&amp;nbsp; 
OMstream as&amp;nbsp; a left join sqlStream as&amp;nbsp; b on a.ip 
&amp;gt;b.start_ip and a.ip<b.end_ip")
&amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id 
from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip 
&amp;gt; b.start_ip and a.ip <b.end_ip ")
&amp;nbsp; &amp;nbsp; val resRow = table.toRetractStream[Row]
&nbsp;
5 ????????????
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query:&amp;nbsp;
&nbsp;
&nbsp;
LogicalProject(netstruct_id=[$1])
&amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $2), <($0, $3))], 
joinType=[left])
&amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
&amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, 
start_ip, end_ip])
&nbsp;
&nbsp;
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
at 
org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
at 
org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
at 
org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
at 
org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
&nbsp;
&nbsp;
&nbsp;
&nbsp;
&nbsp;
6 ??????????????&amp;nbsp;
select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join 
sqlStream as b on a.ip &amp;gt; b.start_ip
??????????????????????????????&amp;nbsp;
&nbsp;
&nbsp;
??????
&nbsp;
&nbsp;
&nbsp;
&nbsp;
------------------&amp;nbsp;????????&amp;nbsp;------------------
??????:&amp;nbsp;"Benchao Li"<libenc...@apache.org&amp;gt;;
????????:&amp;nbsp;2020??6??9??(??????) ????4:37
??????:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&nbsp;
????:&amp;nbsp;Re: ????flinksql between????
&nbsp;
&nbsp;
&nbsp;
????????????????????????
1. ????????Flink????????
2. ??????planner????blink planner????old planner??
3. ??????streaming mode????batch mode??
4. ??????????????????????
&nbsp;
?????? <932460...@qq.com&amp;gt; ??2020??6??9?????? ????4:26??????
&nbsp;
&amp;gt; hi??????flinksql?????? select * from a join b on a.ip <b.startip and 
a.ip
&amp;gt; &amp;amp;gt;b.endip ???????????? ???????????? 
??????????????????between??????????????

回复