???? ????????????????????????????join????mysql?? ????????group by 
?????????????????????????????????????????????????????????? A(id,ip) 
mysql??B(startip,endip,area_id)
??????????A.ip between(B.startip,B.endIp) ??????????????area_id 
??????????????area_id
????????????????????sql between????????????ID ??????????
??????????????
    val table = tnv.sqlQuery("select a.*,b.area_id as 
s_area_id,b.unit_id as s_unit_id,(ip_to_num(b.end_ip)-ip_to_num(b.start_ip)) as 
scoped  from  OMstream as a left join sqlStream as b on 
ip_to_num(a.s_ip) > ip_to_num(b.start_ip) and ip_to_num(a.s_ip) 
<ip_to_num(b.end_ip) and a.device_id=b.device_id")
&nbsp; &nbsp; val value2: DataStream[(Boolean, Row)] = 
table.toRetractStream[Row]
&nbsp; &nbsp; val value3 = value2.filter(_._1)&nbsp; //?????????????????????? 
??????????????????
&nbsp; &nbsp; val value4 =value3.map(x =&gt; {
&nbsp; &nbsp; &nbsp; val mes =info(x._2.getField(0).toString, 
x._2.getField(1).toString, x._2.getField(2).toString, 
x._2.getField(3).toString, x._2.getField(4).toString, 
x._2.getField(5).toString, x._2.getField(6).toString, 
x._2.getField(7).toString, x._2.getField(8).toString, x._2.getField(9).toString,
&nbsp; &nbsp; &nbsp; &nbsp; x._2.getField(10).toString, 
x._2.getField(11).toString.toInt, x._2.getField(12).toString, 
x._2.getField(13).toString, x._2.getField(14).toString, 
x._2.getField(15).toString, x._2.getField(16).toString, 
x._2.getField(17).toString, x._2.getField(18).toString.toInt, 
x._2.getField(19).toString, x._2.getField(20).toString, 
x._2.getField(21).toString,
&nbsp; &nbsp; &nbsp; &nbsp; x._2.getField(22).toString, 
x._2.getField(23).toString, x._2.getField(24).toString, 
x._2.getField(25).toString, x._2.getField(26).toString.toInt, 
x._2.getField(27).toString.toInt, x._2.getField(28).toString.toInt)
&nbsp; &nbsp; &nbsp; mes
&nbsp; &nbsp; }) //??????????????row????case class&nbsp;
&nbsp; &nbsp; value4.print&nbsp; //??????????????
&nbsp; &nbsp; value4.keyBy("rowkey").timeWindow(Time.seconds(2)).minBy("scoped")



------------------&nbsp;????????&nbsp;------------------
??????:&nbsp;"Leonard Xu"<xbjt...@gmail.com&gt;;
????????:&nbsp;2020??6??10??(??????) ????8:28
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: ????flinksql between????



&gt; 
&gt; ????????????????flink1.10.0 ?????????????????????????????????? 
???????????? ????????????????????????

????????????????????????????????????????????????????????????????????????????????????????????????????????????????????

??????
Leonard Xu


&gt; 
tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id)
&gt; &amp;nbsp; &amp;nbsp; 
tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num,
&gt; &amp;nbsp; 
'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id)
&gt; &amp;nbsp; &amp;nbsp; tnv.registerFunction("ip_to_num",IPtoNum)
&gt; 
&gt; &amp;nbsp;?????????? ????????
&gt; log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig 
<http://logging.apache.org/log4j/1.2/faq.html#noconfig&gt; for more info.
&gt; Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Too many fields referenced from an atomic type.
&gt;    at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388)
&gt;    at 
org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259)
&gt;    at 
org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227)
&gt;    at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237)
&gt;    at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236)
&gt;    at scala.Option.map(Option.scala:146)
&gt;    at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236)
&gt;    at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81)
&gt;    at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94)
&gt;    at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77)
&gt;    at 
com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala)
&gt; 
&gt; 
&gt; 
&gt; 
&gt; 
&gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt; ??????:&amp;nbsp;"Leonard Xu"<xbjt...@gmail.com 
<mailto:xbjt...@gmail.com&gt;&amp;gt;;
&gt; ????????:&amp;nbsp;2020??6??10??(??????) ????1:16
&gt; ??????:&amp;nbsp;"user-zh"<user-zh@flink.apache.org 
<mailto:user-zh@flink.apache.org&gt;&amp;gt;;
&gt; 
&gt; ????:&amp;nbsp;Re: ????flinksql between????
&gt; 
&gt; 
&gt; 
&gt; Hi,
&gt; 
&gt; ????????????????????????source(????)??&amp;nbsp; ??????????mysql 
??????join??????????????????????sql????????????????????????????regular join, 
????join??????[1]??
&gt; 
&gt; SELECT
&gt; &amp;nbsp; o.amout, o.currency, r.rate, o.amount * r.rate
&gt; FROM
&gt; &amp;nbsp; Orders AS o
&gt; &amp;nbsp; JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
&gt; &amp;nbsp; ON r.currency = o.currency
&gt; ????JDBC connector????????LookupSource?????????????????????????????????? 
connector.lookup.cache.ttl 
??????????????cache????????????????????????????????????
&gt; 
&gt; Best,
&gt; Leonard Xu
&gt; 
&gt; [1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&gt;<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&amp;gt
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&amp;gt&gt;;
&gt; [2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&gt;
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&amp;gt
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&amp;gt&gt;;
&gt; 
&gt; &amp;gt; ?? 2020??6??10????10:43???????? <932460...@qq.com 
<mailto:932460...@qq.com&gt;&amp;gt; ??????
&gt; &amp;gt; 
&gt; &amp;gt; hi,???????? ?????????????????????????? 
??????join????????mysql???? 
????????????source????????mysql????????????????????????????
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 
------------------&amp;amp;nbsp;????????&amp;amp;nbsp;------------------
&gt; &amp;gt; ??????:&amp;amp;nbsp;"wangweigu...@stevegame.cn 
<mailto:wangweigu...@stevegame.cn&gt;";<wangweigu...@stevegame.cn 
<mailto:wangweigu...@stevegame.cn&gt;&amp;amp;gt;;
&gt; &amp;gt; ????????:&amp;amp;nbsp;2020??6??9??(??????) ????6:35
&gt; &amp;gt; ??????:&amp;amp;nbsp;"user-zh"<user-zh@flink.apache.org 
<mailto:user-zh@flink.apache.org&gt;&amp;amp;gt;;
&gt; &amp;gt; 
&gt; &amp;gt; ????:&amp;amp;nbsp;????: ?????? ????flinksql between????
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; &amp;amp;nbsp; ????1.10???? 
useBlinkPlanner????????????useOldPlanner????????????
&gt; &amp;gt; &amp;amp;nbsp; 
&gt; &amp;gt; ??????????????????
&gt; &amp;gt; &amp;amp;nbsp; Exception in thread "main" 
org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query: 
&gt; &amp;gt; 
&gt; &amp;gt; LogicalProject(num=[$0])
&gt; &amp;gt; &amp;amp;nbsp; LogicalJoin(condition=[AND(&amp;amp;gt;($0, $1), 
<($0, $2))], joinType=[inner])
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 
FlinkLogicalDataStreamScan(id=[1], fields=[num])
&gt; &amp;gt; &amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; 
FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum])
&gt; &amp;gt; 
&gt; &amp;gt; This exception indicates that the query uses an unsupported SQL 
feature.
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; 
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; ???????? ??????
&gt; &amp;gt; ?????????? 2020-06-09 17:41
&gt; &amp;gt; ???????? user-zh
&gt; &amp;gt; ?????? ?????? ????flinksql between????
&gt; &amp;gt; hi????????????&amp;amp;amp;nbsp;
&gt; &amp;gt; 1 flink1.9.0
&gt; &amp;gt; 2 oldplanner
&gt; &amp;gt; <dependency&amp;amp;amp;gt;
&gt; &amp;gt; <groupId&amp;amp;amp;gt;org.apache.flink</groupId&amp;amp;amp;gt;
&gt; &amp;gt; 
<artifactId&amp;amp;amp;gt;flink-table-api-scala_2.11</artifactId&amp;amp;amp;gt;
&gt; &amp;gt; <version&amp;amp;amp;gt;1.9.0</version&amp;amp;amp;gt;
&gt; &amp;gt; </dependency&amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; <dependency&amp;amp;amp;gt;
&gt; &amp;gt; <groupId&amp;amp;amp;gt;org.apache.flink</groupId&amp;amp;amp;gt;
&gt; &amp;gt; 
<artifactId&amp;amp;amp;gt;flink-table-planner_2.11</artifactId&amp;amp;amp;gt;
&gt; &amp;gt; <version&amp;amp;amp;gt;1.9.0</version&amp;amp;amp;gt;
&gt; &amp;gt; </dependency&amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 3 streaming mode
&gt; &amp;gt; 4. ????????????
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; val sqlStream = 
env.createInput(jdbcInput)
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; 
tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip)
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; 
tnv.registerDataStream("OMstream",value,'ip)
&gt; &amp;gt; //&amp;amp;amp;nbsp; &amp;amp;amp;nbsp; val table = 
tnv.sqlQuery("select * from&amp;amp;amp;nbsp; OMstream as&amp;amp;amp;nbsp; a 
left join sqlStream as&amp;amp;amp;nbsp; b on a.ip &amp;amp;amp;gt;b.start_ip 
and a.ip<b.end_ip")
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; val table = 
tnv.sqlQuery("select b.netstruct_id from&amp;amp;amp;nbsp; OMstream 
as&amp;amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;amp;gt; 
b.start_ip and a.ip <b.end_ip ")
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; val resRow = 
table.toRetractStream[Row]
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 5 ????????????
&gt; &amp;gt; Exception in thread "main" 
org.apache.flink.table.api.TableException: Cannot generate a valid execution 
plan for the given query:&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; LogicalProject(netstruct_id=[$1])
&gt; &amp;gt; &amp;amp;amp;nbsp; 
LogicalJoin(condition=[AND(&amp;amp;amp;gt;($0, $2), <($0, $3))], 
joinType=[left])
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; 
FlinkLogicalDataStreamScan(id=[1], fields=[ip])
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp; 
FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip])
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; This exception indicates that the query uses an unsupported SQL 
feature.
&gt; &amp;gt; Please check the documentation for the set of currently supported 
SQL features.
&gt; &amp;gt; at 
org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
&gt; &amp;gt; at 
org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160)
&gt; &amp;gt; at 
org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66)
&gt; &amp;gt; at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
&gt; &amp;gt; at 
org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182)
&gt; &amp;gt; at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
&gt; &amp;gt; at 
org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127)
&gt; &amp;gt; at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&gt; &amp;gt; at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&gt; &amp;gt; at scala.collection.Iterator$class.foreach(Iterator.scala:891)
&gt; &amp;gt; at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
&gt; &amp;gt; at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
&gt; &amp;gt; at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
&gt; &amp;gt; at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
&gt; &amp;gt; at scala.collection.AbstractTraversable.map(Traversable.scala:104)
&gt; &amp;gt; at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
&gt; &amp;gt; at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201)
&gt; &amp;gt; at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124)
&gt; &amp;gt; at 
org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
&gt; &amp;gt; at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37)
&gt; &amp;gt; at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala)
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 6 ??????????????&amp;amp;amp;nbsp;
&gt; &amp;gt; select b.netstruct_id from&amp;amp;amp;nbsp; OMstream 
as&amp;amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;amp;gt; 
b.start_ip
&gt; &amp;gt; ??????????????????????????????&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; ??????
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; 
------------------&amp;amp;amp;nbsp;????????&amp;amp;amp;nbsp;------------------
&gt; &amp;gt; ??????:&amp;amp;amp;nbsp;"Benchao Li"<libenc...@apache.org 
<mailto:libenc...@apache.org&gt;&amp;amp;amp;gt;;
&gt; &amp;gt; ????????:&amp;amp;amp;nbsp;2020??6??9??(??????) ????4:37
&gt; &amp;gt; ??????:&amp;amp;amp;nbsp;"user-zh"<user-zh@flink.apache.org 
<mailto:user-zh@flink.apache.org&gt;&amp;amp;amp;gt;;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; ????:&amp;amp;amp;nbsp;Re: ????flinksql between????
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; ????????????????????????
&gt; &amp;gt; 1. ????????Flink????????
&gt; &amp;gt; 2. ??????planner????blink planner????old planner??
&gt; &amp;gt; 3. ??????streaming mode????batch mode??
&gt; &amp;gt; 4. ??????????????????????
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; ?????? <932460...@qq.com&amp;amp;amp;gt; ??2020??6??9?????? 
????4:26??????
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;gt; hi??????flinksql?????? select * from a join b on 
a.ip <b.startip and a.ip
&gt; &amp;gt; &amp;amp;amp;gt; &amp;amp;amp;amp;gt;b.endip ???????????? 
???????????? ??????????????????between??????????????

回复