你的意思是你的mysql维表是自定义的,然后是定期更新的维表内容是么?只要你实现的是LookupSource,应该是没问题的。 内部实现你可以自己控制。
小屁孩 <932460...@qq.com> 于2020年6月10日周三 上午10:46写道: > hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 > 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗? > > > > > ------------------ 原始邮件 ------------------ > 发件人: "wangweigu...@stevegame.cn"<wangweigu...@stevegame.cn>; > 发送时间: 2020年6月9日(星期二) 晚上6:35 > 收件人: "user-zh"<user-zh@flink.apache.org>; > > 主题: 回复: 回复: 关于flinksql between问题 > > > > > 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的! > > 会报你下面的错误: > Exception in thread "main" > org.apache.flink.table.api.TableException: Cannot generate a valid > execution plan for the given query: > > LogicalProject(num=[$0]) > LogicalJoin(condition=[AND(>($0, $1), <($0, $2))], > joinType=[inner]) > FlinkLogicalDataStreamScan(id=[1], fields=[num]) > FlinkLogicalDataStreamScan(id=[2], fields=[startNum, > endNum]) > > This exception indicates that the query uses an unsupported SQL feature. > > > > > > 发件人: 小屁孩 > 发送时间: 2020-06-09 17:41 > 收件人: user-zh > 主题: 回复: 关于flinksql between问题 > hi,我使用的是&nbsp; > 1 flink1.9.0 > 2 oldplanner > <dependency&gt; > <groupId&gt;org.apache.flink</groupId&gt; > <artifactId&gt;flink-table-api-scala_2.11</artifactId&gt; > <version&gt;1.9.0</version&gt; > </dependency&gt; > > > <dependency&gt; > <groupId&gt;org.apache.flink</groupId&gt; > <artifactId&gt;flink-table-planner_2.11</artifactId&gt; > <version&gt;1.9.0</version&gt; > </dependency&gt; > > 3 streaming mode > 4. 代码类似如下 > &nbsp; &nbsp; val sqlStream = env.createInput(jdbcInput) > &nbsp; &nbsp; > tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip) > &nbsp; &nbsp; tnv.registerDataStream("OMstream",value,'ip) > //&nbsp; &nbsp; val table = tnv.sqlQuery("select * from&nbsp; > OMstream as&nbsp; a left join sqlStream as&nbsp; b on a.ip > &gt;b.start_ip and a.ip<b.end_ip") > &nbsp; &nbsp; val table = tnv.sqlQuery("select b.netstruct_id > from&nbsp; OMstream as&nbsp; a left join sqlStream as b on a.ip > &gt; b.start_ip and a.ip <b.end_ip ") > &nbsp; &nbsp; val resRow = table.toRetractStream[Row] > > 5 报错信息如下 > Exception in thread "main" org.apache.flink.table.api.TableException: > Cannot generate a valid execution plan for the given query:&nbsp; > > > LogicalProject(netstruct_id=[$1]) > &nbsp; LogicalJoin(condition=[AND(&gt;($0, $2), <($0, $3))], > joinType=[left]) > &nbsp; &nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip]) > &nbsp; &nbsp; FlinkLogicalDataStreamScan(id=[2], > fields=[netstruct_id, start_ip, end_ip]) > > > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245) > at > org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160) > at > org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66) > at > org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) > at org.apache.flink.table.planner.StreamPlanner.org > $apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) > at > org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > at > org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) > at > org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201) > at > org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124) > at > org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) > at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37) > at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala) > > > > > > 6 我也尝试使用了&nbsp; > select b.netstruct_id from&nbsp; OMstream as&nbsp; a left join > sqlStream as b on a.ip &gt; b.start_ip > 同样是单个大小比较也是不可以的&nbsp; > > > 谢谢! > > > > > ------------------&nbsp;原始邮件&nbsp;------------------ > 发件人:&nbsp;"Benchao Li"<libenc...@apache.org&gt;; > 发送时间:&nbsp;2020年6月9日(星期二) 下午4:37 > 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;; > > 主题:&nbsp;Re: 关于flinksql between问题 > > > > 方便补充一下以下信息么? > 1. 你使用的Flink的版本? > 2. 使用的planner,是blink planner还是old planner? > 3. 用的是streaming mode还是batch mode? > 4. 具体的报错信息是什么? > > 小屁孩 <932460...@qq.com&gt; 于2020年6月9日周二 下午4:26写道: > > &gt; hi,我在flinksql中使用 select * from a join b on a.ip <b.startip and > a.ip > &gt; &amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用