?????? ????flinksql between????

2020-06-11 文章 ??????
 joinmysql?? group by 
?? A(id,ip) 
mysql??B(startip,endip,area_id)
??A.ip between(B.startip,B.endIp) ??area_id 
??area_id
sql betweenID ??
??
  val table = tnv.sqlQuery("select a.*,b.area_id as 
s_area_id,b.unit_id as s_unit_id,(ip_to_num(b.end_ip)-ip_to_num(b.start_ip)) as 
scoped from OMstream as a left join sqlStream as b on 
ip_to_num(a.s_ip)  ip_to_num(b.start_ip) and ip_to_num(a.s_ip) 
http://logging.apache.org/log4j/1.2/faq.html#noconfig 
<http://logging.apache.org/log4j/1.2/faq.html#noconfig; for more info.
 Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Too many fields referenced from an atomic type.
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)
 
 
 
 
 
 --nbsp;nbsp;--
 ??:nbsp;"Leonard Xu"mailto:xbjt...@gmail.comgt;;
 :nbsp;2020??6??10??(??) 1:16
 ??:nbsp;"user-zh"mailto:user-zh@flink.apache.orggt;;
 
 :nbsp;Re: flinksql between
 
 
 
 Hi,
 
 source()??nbsp; ??mysql 
??join??sqlregular join, 
join??[1]??
 
 SELECT
 nbsp; o.amout, o.currency, r.rate, o.amount * r.rate
 FROM
 nbsp; Orders AS o
 nbsp; JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
 nbsp; ON r.currency = o.currency
 JDBC connectorLookupSource?? 
connector.lookup.cache.ttl 
??cache
 
 Best,
 Leonard Xu
 
 [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins;<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joinsgt
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joinsgt;;
 [2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector;
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connectorgt
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connectorgt;;
 
 gt; ?? 2020??6??1010:43 <932460...@qq.com 
<mailto:932460...@qq.comgt; ??
 gt; 
 gt; hi, ?? 
??joinmysql 
sourcemysql
 gt; 
 gt; 
 gt; 
 gt; 
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:amp;nbsp;"wangweigu...@stevegame.cn 
<mailto:wangweigu...@stevegame.cn;mailto:wangweigu...@stevegame.cnamp;gt;;
 gt; :amp;nbsp;2020??6??9??(??) 6:35
 gt; ??:amp;nbsp;"user-zh"mailto:user-zh@flink.apache.orgamp;gt;;
 gt; 
 gt; :amp;nbsp;: ?? flinksql between
 gt; 
 gt; 
 gt; 
 gt; 
 gt; amp;nbsp; 1.10 
useBlinkPlanneruseOldPlanner
 gt; amp;nbsp; 
 gt; ??
 gt; amp;nbsp; Exception in thread "main" 
org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query: 
 gt; 
 gt; LogicalProject(num=[$0])
 gt; amp;nbsp; LogicalJoin(condition=[AND(amp;gt;($0, $1), 
<($0, $2))], joinType=[inner])
 gt; amp;nbsp;amp;nbsp;amp;nbsp; 
FlinkLogicalDataStreamScan(id=[1], fields=[num])
 gt; amp;nbsp;amp;nbsp;amp;nbsp; 
FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])
 

Re: 关于flinksql between问题

2020-06-10 文章 Leonard Xu
> 
> 非常感谢,使用的flink1.10.0 中流转成表是是否有字段的数目的限制 我把流转成表 我的流是一个封装的实体类

转换的时候没有这个字段数目的限制的,另外看你的字段也不是很多,一般业务上几百个字段都正常的,你检查下你字段的对应关系

祝好,
Leonard Xu


> tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
>   
> tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
>  
> 'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
>   tnv.registerFunction("ip_to_num",IPtoNum)
> 
> 在转成表时 如下错误
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig 
> <http://logging.apache.org/log4j/1.2/faq.html#noconfig> for more info.
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Too many fields referenced from an atomic type.
>   at 
> org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
>   at 
> org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
>   at 
> org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
>   at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
>   at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
>   at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
>   at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
>   at 
> com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
>   at 
> com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)
> 
> 
> 
> 
> 
> --原始邮件--
> 发件人:"Leonard Xu"mailto:xbjt...@gmail.com>;
> 发送时间:2020年6月10日(星期三) 中午1:16
> 收件人:"user-zh" <mailto:user-zh@flink.apache.org>;
> 
> 主题:Re: 关于flinksql between问题
> 
> 
> 
> Hi,
> 
> 看你描述的想要的是自定义source(左表), 需要同一张mysql 
> 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular join, 维表join的语法[1]:
> 
> SELECT
>  o.amout, o.currency, r.rate, o.amount * r.rate
> FROM
>  Orders AS o
>  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
>  ON r.currency = o.currency
> 另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 
> 参数控制维表中cache的过期时间,不知道是否满足你的需求。
> 
> Best,
> Leonard Xu
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins><https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins>;
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector>
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector>;
> 
>  在 2020年6月10日,10:43,小屁孩 <932460...@qq.com <mailto:932460...@qq.com> 
> 写道:
>  
>  hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 
> 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
>  
>  
>  
>  
>  --nbsp;原始邮件nbsp;--
>  发件人:nbsp;"wangweigu...@stevegame.cn 
> <mailto:wangweigu...@stevegame.cn>" <mailto:wangweigu...@stevegame.cn>gt;;
>  发送时间:nbsp;2020年6月9日(星期二) 晚上6:35
>  收件人:nbsp;"user-zh" <mailto:user-zh@flink.apache.org>gt;;
>  
>  主题:nbsp;回复: 回复: 关于flinksql between问题
>  
>  
>  
>  
>  nbsp; 我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
>  nbsp; 
>  会报你下面的错误:
>  nbsp; Exception in thread "main" 
> org.apache.flink.table.api.TableException: Cannot generate a valid execution 
> plan for the given query: 
>  
>  LogicalProject(num=[$0])
>  nbsp; LogicalJoin(condition=[AND(gt;($

????: ?????? ????flinksql between????

2020-06-10 文章 wangweigu...@stevegame.cn

  ??valuemysqlinst


 
 ??
?? 2020-06-10 15:25
 user-zh
?? ?? flinksql between
flink1.10.0 ??  

tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
  
tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
 
'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
  tnv.registerFunction("ip_to_num",IPtoNum)
 
?? 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too 
many fields referenced from an atomic type.
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)
 
 
 
 
 
----
??:"Leonard Xu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins;
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector;
 
 ?? 2020??6??1010:43 <932460...@qq.com ??
 
 hi, ?? ??joinmysql 
sourcemysql
 
 
 
 
 --nbsp;nbsp;--
 
??:nbsp;"wangweigu...@stevegame.cn"

?????? ????flinksql between????

2020-06-10 文章 ??????
flink1.10.0 ??  

tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
  
tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
 
'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
  tnv.registerFunction("ip_to_num",IPtoNum)

?? 
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.
Exception in thread "main" org.apache.flink.table.api.ValidationException: Too 
many fields referenced from an atomic type.
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
at 
org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)





----
??:"Leonard Xu"https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
 


Re: 关于flinksql between问题

2020-06-09 文章 Leonard Xu
Hi,

看你描述的想要的是自定义source(左表),  需要同一张mysql 维表做join,如果是这样的话,你的sql看起来有点问题,目前的写法是regular 
join, 维表join的语法[1]:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency
另外JDBC connector是实现了LookupSource的,也就是支持做维表,维表的更新的 connector.lookup.cache.ttl 
参数控制维表中cache的过期时间,不知道是否满足你的需求。

Best,
Leonard Xu

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins>
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector>

> 在 2020年6月10日,10:43,小屁孩 <932460...@qq.com> 写道:
> 
> hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
> 
> 
> 
> 
> --原始邮件--
> 发件人:"wangweigu...@stevegame.cn" 发送时间:2020年6月9日(星期二) 晚上6:35
> 收件人:"user-zh" 
> 主题:回复: 回复: 关于flinksql between问题
> 
> 
> 
> 
>  我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
>  
> 会报你下面的错误:
>  Exception in thread "main" org.apache.flink.table.api.TableException: 
> Cannot generate a valid execution plan for the given query: 
> 
> LogicalProject(num=[$0])
>  LogicalJoin(condition=[AND(($0, $1), <($0, $2))], joinType=[inner])
>  FlinkLogicalDataStreamScan(id=[1], fields=[num])
>  FlinkLogicalDataStreamScan(id=[2], fields=[startNum, 
> endNum])
> 
> This exception indicates that the query uses an unsupported SQL feature.
> 
> 
> 
> 
> 
> 发件人: 小屁孩
> 发送时间: 2020-06-09 17:41
> 收件人: user-zh
> 主题: 回复: 关于flinksql between问题
> hi,我使用的是nbsp;
> 1 flink1.9.0
> 2 oldplanner
>  
> 
>  
> 3 streaming mode
> 4. 代码类似如下
> nbsp; nbsp; val sqlStream = env.createInput(jdbcInput)
> nbsp; nbsp; 
> tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
> nbsp; nbsp; tnv.registerDataStream("OMstream",value,'ip)
> //nbsp; nbsp; val table = tnv.sqlQuery("select * fromnbsp; 
> OMstream asnbsp; a left join sqlStream asnbsp; b on a.ip 
> gt;b.start_ip and a.ip nbsp; nbsp; val table = tnv.sqlQuery("select b.netstruct_id 
> fromnbsp; OMstream asnbsp; a left join sqlStream as b on a.ip 
> gt; b.start_ip and a.ip  nbsp; nbsp; val resRow = table.toRetractStream[Row]
> 
> 5 报错信息如下
> Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
> generate a valid execution plan for the given query:nbsp;
> 
> 
> LogicalProject(netstruct_id=[$1])
> nbsp; LogicalJoin(condition=[AND(gt;($0, $2), <($0, $3))], 
> joinType=[left])
> nbsp; nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
> nbsp; nbsp; FlinkLogicalDataStreamScan(id=[2], 
> fields=[netstruct_id, start_ip, end_ip])
> 
> 
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> features.
> at 
> org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
> at 
> org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
> at 
> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
> at 
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
> at 
> org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
> at 
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at 
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
> at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
> at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironment

Re: 回复: 关于flinksql between问题

2020-06-09 文章 Benchao Li
你的意思是你的mysql维表是自定义的,然后是定期更新的维表内容是么?只要你实现的是LookupSource,应该是没问题的。
内部实现你可以自己控制。

小屁孩 <932460...@qq.com> 于2020年6月10日周三 上午10:46写道:

> hi,感谢指导 已经可以实现,可以再问一下 如果我join的是一个mysql维表
> 我是自定义的source定时更新mysql表这样跟流表关联是可以的吗?
>
>
>
>
> --原始邮件--
> 发件人:"wangweigu...@stevegame.cn" 发送时间:2020年6月9日(星期二) 晚上6:35
> 收件人:"user-zh"
> 主题:回复: 回复: 关于flinksql between问题
>
>
>
>
>  我在1.10中用 useBlinkPlanner是可以的,用useOldPlanner是不可以的!
> 
> 会报你下面的错误:
>  Exception in thread "main"
> org.apache.flink.table.api.TableException: Cannot generate a valid
> execution plan for the given query:
>
> LogicalProject(num=[$0])
>  LogicalJoin(condition=[AND(($0, $1), <($0, $2))],
> joinType=[inner])
>  FlinkLogicalDataStreamScan(id=[1], fields=[num])
>  FlinkLogicalDataStreamScan(id=[2], fields=[startNum,
> endNum])
>
> This exception indicates that the query uses an unsupported SQL feature.
>
>
>
>
> 
> 发件人: 小屁孩
> 发送时间: 2020-06-09 17:41
> 收件人: user-zh
> 主题: 回复: 关于flinksql between问题
> hi,我使用的是nbsp;
> 1 flink1.9.0
> 2 oldplanner
>  
> 
>  
> 3 streaming mode
> 4. 代码类似如下
> nbsp; nbsp; val sqlStream = env.createInput(jdbcInput)
> nbsp; nbsp;
> tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
> nbsp; nbsp; tnv.registerDataStream("OMstream",value,'ip)
> //nbsp; nbsp; val table = tnv.sqlQuery("select * fromnbsp;
> OMstream asnbsp; a left join sqlStream asnbsp; b on a.ip
> gt;b.start_ip and a.ip nbsp; nbsp; val table = tnv.sqlQuery("select b.netstruct_id
> fromnbsp; OMstream asnbsp; a left join sqlStream as b on a.ip
> gt; b.start_ip and a.ip  nbsp; nbsp; val resRow = table.toRetractStream[Row]
> 
> 5 报错信息如下
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Cannot generate a valid execution plan for the given query:nbsp;
> 
> 
> LogicalProject(netstruct_id=[$1])
> nbsp; LogicalJoin(condition=[AND(gt;($0, $2), <($0, $3))],
> joinType=[left])
> nbsp; nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip])
> nbsp; nbsp; FlinkLogicalDataStreamScan(id=[2],
> fields=[netstruct_id, start_ip, end_ip])
> 
> 
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL
> features.
> at
> org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
> at
> org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
> at
> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
> at
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
> at org.apache.flink.table.planner.StreamPlanner.org
> $apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
> at
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at
> org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
> at
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
> at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
> at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
> 
> 
> 
> 
> 
> 6 我也尝试使用了nbsp;
> select b.netstruct_id fromnbsp; OMstream asnbsp; a left join
> sqlStream as b on a.ip gt; b.start_ip
> 同样是单个大小比较也是不可以的nbsp;
> 
> 
> 谢谢!
> 
> 
> 
> 
> --nbsp;原始邮件nbsp;--
> 发件人:nbsp;"Benchao Li" 发送时间:nbsp;2020年6月9日(星期二) 下午4:37
> 收件人:nbsp;"user-zh" 
> 主题:nbsp;Re: 关于flinksql between问题
> 
> 
> 
> 方便补充一下以下信息么?
> 1. 你使用的Flink的版本?
> 2. 使用的planner,是blink planner还是old planner?
> 3. 用的是streaming mode还是batch mode?
> 4. 具体的报错信息是什么?
> 
> 小屁孩 <932460...@qq.comgt; 于2020年6月9日周二 下午4:26写道:
> 
> gt; hi,我在flinksql中使用 select * from a join b on a.ip  a.ip
> gt; amp;gt;b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用


?????? ?????? ????flinksql between????

2020-06-09 文章 ??????
hi, ?? ??joinmysql 
sourcemysql




----
??:"wangweigu...@stevegame.cn"

????: ?????? ????flinksql between????

2020-06-09 文章 wangweigu...@stevegame.cn

  1.10 useBlinkPlanneruseOldPlanner
  
??
  Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 

LogicalProject(num=[$0])
  LogicalJoin(condition=[AND(>($0, $1), <($0, $2))], joinType=[inner])
FlinkLogicalDataStreamScan(id=[1], fields=[num])
FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])

This exception indicates that the query uses an unsupported SQL feature.




 
 ??
?? 2020-06-09 17:41
 user-zh
?? ?????? flinksql between
hi
1 flink1.9.0
2 oldplanner


?????? ????flinksql between????

2020-06-09 文章 ??????
hi
1 flink1.9.0
2 oldplanner


Re: 关于flinksql between问题

2020-06-09 文章 Benchao Li
方便补充一下以下信息么?
1. 你使用的Flink的版本?
2. 使用的planner,是blink planner还是old planner?
3. 用的是streaming mode还是batch mode?
4. 具体的报错信息是什么?

小屁孩 <932460...@qq.com> 于2020年6月9日周二 下午4:26写道:

> hi,我在flinksql中使用 select * from a join b on a.ip  b.endip 报了一个错误 不支持此功能 类似这种有没有类似between的函数可以使用


????flinksql between????

2020-06-09 文章 ??????
hi??flinksql?? select * from a join b on a.ip