你的意思是你的mysql维表是自定义的,然后是定期更新的维表内容是么?只要你实现的是LookupSource,应该是没问题的。
内部实现你可以自己控制。

小屁孩 <932460...@qq.com> 于2020年6月10日周三 上午10:46写道:

> hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表
> 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"wangweigu...@stevegame.cn"<wangweigu...@stevegame.cn&gt;;
> 发送时间:&nbsp;2020年6月9日(星期二) 晚上6:35
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;回复: 回复: 关于flinksql between问题
>
>
>
>
> &nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
> &nbsp;
> 会报你下面的错误:
> &nbsp; Exception in thread "main"
> org.apache.flink.table.api.TableException: Cannot generate a valid
> execution plan for the given query:
>
> LogicalProject(num=[$0])
> &nbsp; LogicalJoin(condition=[AND(&gt;($0, $1), <($0, $2))],
> joinType=[inner])
> &nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num])
> &nbsp;&nbsp;&nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum,
> endNum])
>
> This exception indicates that the query uses an unsupported SQL feature.
>
>
>
>
> &nbsp;
> 发件人: 小屁孩
> 发送时间: 2020-06-09 17:41
> 收件人: user-zh
> 主题: 回复: 关于flinksql between问题
> hi,我使用的是&amp;nbsp;
> 1 flink1.9.0
> 2 oldplanner
> <dependency&amp;gt;
> <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
> <artifactId&amp;gt;flink-table-api-scala_2.11</artifactId&amp;gt;
> <version&amp;gt;1.9.0</version&amp;gt;
> </dependency&amp;gt;
> &nbsp;
> &nbsp;
> <dependency&amp;gt;
> <groupId&amp;gt;org.apache.flink</groupId&amp;gt;
> <artifactId&amp;gt;flink-table-planner_2.11</artifactId&amp;gt;
> <version&amp;gt;1.9.0</version&amp;gt;
> </dependency&amp;gt;
> &nbsp;
> 3 streaming mode
> 4. 代码类似如下
> &amp;nbsp; &amp;nbsp; val sqlStream = env.createInput(jdbcInput)
> &amp;nbsp; &amp;nbsp;
> tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
> &amp;nbsp; &amp;nbsp; tnv.registerDataStream("OMstream",value,'ip)
> //&amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select * from&amp;nbsp;
> OMstream as&amp;nbsp; a left join sqlStream as&amp;nbsp; b on a.ip
> &amp;gt;b.start_ip and a.ip<b.end_ip")
> &amp;nbsp; &amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id
> from&amp;nbsp; OMstream as&amp;nbsp; a left join sqlStream as b on a.ip
> &amp;gt; b.start_ip and a.ip <b.end_ip ")
> &amp;nbsp; &amp;nbsp; val resRow = table.toRetractStream[Row]
> &nbsp;
> 5 报错信息如下
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:&amp;nbsp;
> &nbsp;
> &nbsp;
> LogicalProject(netstruct_id=[$1])
> &amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $2), <($0, $3))],
> joinType=[left])
> &amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
> &amp;nbsp; &amp;nbsp; FlinkLogicalDataStreamScan(id=[2],
> fields=[netstruct_id, start_ip, end_ip])
> &nbsp;
> &nbsp;
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
> at
> org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
> at
> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
> at
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
> at org.apache.flink.table.planner.StreamPlanner.org
> $apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
> at
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
> at
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
> at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
> at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
> &nbsp;
> &nbsp;
> &nbsp;
> &nbsp;
> &nbsp;
> 6 我也尝试使用了&amp;nbsp;
> select b.netstruct_id from&amp;nbsp; OMstream as&amp;nbsp; a left join
> sqlStream as b on a.ip &amp;gt; b.start_ip
> 同样是单个大小比较也是不可以的&amp;nbsp;
> &nbsp;
> &nbsp;
> 谢谢!
> &nbsp;
> &nbsp;
> &nbsp;
> &nbsp;
> ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> 发件人:&amp;nbsp;"Benchao Li"<libenc...@apache.org&amp;gt;;
> 发送时间:&amp;nbsp;2020年6月9日(星期二) 下午4:37
> 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
> &nbsp;
> 主题:&amp;nbsp;Re: 关于flinksql between问题
> &nbsp;
> &nbsp;
> &nbsp;
> 方便补充一下以下信息么?
> 1. 你使用的Flink的版本?
> 2. 使用的planner,是blink planner还是old planner?
> 3. 用的是streaming mode还是batch mode?
> 4. 具体的报错信息是什么?
> &nbsp;
> 小屁孩 <932460...@qq.com&amp;gt; 于2020年6月9日周二 下午4:26写道:
> &nbsp;
> &amp;gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and
> a.ip
> &amp;gt; &amp;amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用

回复