???? ????????????????????????????join????mysql?? ????????group by ?????????????????????????????????????????????????????????? A(id,ip) mysql??B(startip,endip,area_id) ??????????A.ip between(B.startip,B.endIp) ??????????????area_id ??????????????area_id ????????????????????sql between????????????ID ?????????? ?????????????? val table = tnv.sqlQuery("select a.*,b.area_id as s_area_id,b.unit_id as s_unit_id,(ip_to_num(b.end_ip)-ip_to_num(b.start_ip)) as scoped from OMstream as a left join sqlStream as b on ip_to_num(a.s_ip) > ip_to_num(b.start_ip) and ip_to_num(a.s_ip) <ip_to_num(b.end_ip) and a.device_id=b.device_id") val value2: DataStream[(Boolean, Row)] = table.toRetractStream[Row] val value3 = value2.filter(_._1) //?????????????????????? ?????????????????? val value4 =value3.map(x => { val mes =info(x._2.getField(0).toString, x._2.getField(1).toString, x._2.getField(2).toString, x._2.getField(3).toString, x._2.getField(4).toString, x._2.getField(5).toString, x._2.getField(6).toString, x._2.getField(7).toString, x._2.getField(8).toString, x._2.getField(9).toString, x._2.getField(10).toString, x._2.getField(11).toString.toInt, x._2.getField(12).toString, x._2.getField(13).toString, x._2.getField(14).toString, x._2.getField(15).toString, x._2.getField(16).toString, x._2.getField(17).toString, x._2.getField(18).toString.toInt, x._2.getField(19).toString, x._2.getField(20).toString, x._2.getField(21).toString, x._2.getField(22).toString, x._2.getField(23).toString, x._2.getField(24).toString, x._2.getField(25).toString, x._2.getField(26).toString.toInt, x._2.getField(27).toString.toInt, x._2.getField(28).toString.toInt) mes }) //??????????????row????case class value4.print //?????????????? value4.keyBy("rowkey").timeWindow(Time.seconds(2)).minBy("scoped")
------------------ ???????? ------------------ ??????: "Leonard Xu"<xbjt...@gmail.com>; ????????: 2020??6??10??(??????) ????8:28 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: ????flinksql between???? > > ????????????????flink1.10.0 ?????????????????????????????????? ???????????? ???????????????????????? ???????????????????????????????????????????????????????????????????????????????????????????????????????????????????? ?????? Leonard Xu > tnv.registerDataStream("sqlStream",mysqlinst,'start_ip,'end_ip,'area_id,'unit_id,'device_id) > &nbsp; &nbsp; tnv.registerDataStream("OMstream",value,'original_network_id,'asset_id,'types,'d_ip,'d_port,'s_ip,'s_port,'devip,'url,'common_des,'operation_des,'raw_log,'severity,'happen_time,'create_time,'s_ip_num, > &nbsp; 'd_ip_num,'method,'asset_area_id,'device_id,'s_mac,'d_mac,'scope,'dcope,'s_asset_id,'d_asset_id,'asset_unit_id,'area_id,'unit_id,'enterprise_id) > &nbsp; &nbsp; tnv.registerFunction("ip_to_num",IPtoNum) > > &nbsp;?????????? ???????? > log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig <http://logging.apache.org/log4j/1.2/faq.html#noconfig> for more info. > Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type. > at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfoFromAtomicType(FieldInfoUtils.java:388) > at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:259) > at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:227) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:237) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl$$anonfun$1.apply(StreamTableEnvironmentImpl.scala:236) > at scala.Option.map(Option.scala:146) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.scala:236) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.scala:81) > at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.registerDataStream(StreamTableEnvironmentImpl.scala:94) > at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES$.main(MysqlLogToES.scala:77) > at com.jwell56.networksyslog.jobs.jobsource.MysqlLogToES.main(MysqlLogToES.scala) > > > > > > ------------------&nbsp;????????&nbsp;------------------ > ??????:&nbsp;"Leonard Xu"<xbjt...@gmail.com <mailto:xbjt...@gmail.com>&gt;; > ????????:&nbsp;2020??6??10??(??????) ????1:16 > ??????:&nbsp;"user-zh"<user-zh@flink.apache.org <mailto:user-zh@flink.apache.org>&gt;; > > ????:&nbsp;Re: ????flinksql between???? > > > > Hi, > > ????????????????????????source(????)??&nbsp; ??????????mysql ??????join??????????????????????sql????????????????????????????regular join, ????join??????[1]?? > > SELECT > &nbsp; o.amout, o.currency, r.rate, o.amount * r.rate > FROM > &nbsp; Orders AS o > &nbsp; JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r > &nbsp; ON r.currency = o.currency > ????JDBC connector????????LookupSource?????????????????????????????????? connector.lookup.cache.ttl ??????????????cache???????????????????????????????????? > > Best, > Leonard Xu > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins><https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&gt <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/sql/queries.html#joins&gt>; > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&gt <https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#jdbc-connector&gt>; > > &gt; ?? 2020??6??10????10:43???????? <932460...@qq.com <mailto:932460...@qq.com>&gt; ?????? > &gt; > &gt; hi,???????? ?????????????????????????? ??????join????????mysql???? ????????????source????????mysql???????????????????????????? > &gt; > &gt; > &gt; > &gt; > &gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------ > &gt; ??????:&amp;nbsp;"wangweigu...@stevegame.cn <mailto:wangweigu...@stevegame.cn>"<wangweigu...@stevegame.cn <mailto:wangweigu...@stevegame.cn>&amp;gt;; > &gt; ????????:&amp;nbsp;2020??6??9??(??????) ????6:35 > &gt; ??????:&amp;nbsp;"user-zh"<user-zh@flink.apache.org <mailto:user-zh@flink.apache.org>&amp;gt;; > &gt; > &gt; ????:&amp;nbsp;????: ?????? ????flinksql between???? > &gt; > &gt; > &gt; > &gt; > &gt; &amp;nbsp; ????1.10???? useBlinkPlanner????????????useOldPlanner???????????? > &gt; &amp;nbsp; > &gt; ?????????????????? > &gt; &amp;nbsp; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: > &gt; > &gt; LogicalProject(num=[$0]) > &gt; &amp;nbsp; LogicalJoin(condition=[AND(&amp;gt;($0, $1), <($0, $2))], joinType=[inner]) > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[num]) > &gt; &amp;nbsp;&amp;nbsp;&amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[startNum, endNum]) > &gt; > &gt; This exception indicates that the query uses an unsupported SQL feature. > &gt; > &gt; > &gt; > &gt; > &gt; &amp;nbsp; > &gt; ???????? ?????? > &gt; ?????????? 2020-06-09 17:41 > &gt; ???????? user-zh > &gt; ?????? ?????? ????flinksql between???? > &gt; hi????????????&amp;amp;nbsp; > &gt; 1 flink1.9.0 > &gt; 2 oldplanner > &gt; <dependency&amp;amp;gt; > &gt; <groupId&amp;amp;gt;org.apache.flink</groupId&amp;amp;gt; > &gt; <artifactId&amp;amp;gt;flink-table-api-scala_2.11</artifactId&amp;amp;gt; > &gt; <version&amp;amp;gt;1.9.0</version&amp;amp;gt; > &gt; </dependency&amp;amp;gt; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; <dependency&amp;amp;gt; > &gt; <groupId&amp;amp;gt;org.apache.flink</groupId&amp;amp;gt; > &gt; <artifactId&amp;amp;gt;flink-table-planner_2.11</artifactId&amp;amp;gt; > &gt; <version&amp;amp;gt;1.9.0</version&amp;amp;gt; > &gt; </dependency&amp;amp;gt; > &gt; &amp;nbsp; > &gt; 3 streaming mode > &gt; 4. ???????????? > &gt; &amp;amp;nbsp; &amp;amp;nbsp; val sqlStream = env.createInput(jdbcInput) > &gt; &amp;amp;nbsp; &amp;amp;nbsp; tnv.registerDataStream("sqlStream",sqlStream,'netstruct_id,'start_ip,'end_ip) > &gt; &amp;amp;nbsp; &amp;amp;nbsp; tnv.registerDataStream("OMstream",value,'ip) > &gt; //&amp;amp;nbsp; &amp;amp;nbsp; val table = tnv.sqlQuery("select * from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as&amp;amp;nbsp; b on a.ip &amp;amp;gt;b.start_ip and a.ip<b.end_ip") > &gt; &amp;amp;nbsp; &amp;amp;nbsp; val table = tnv.sqlQuery("select b.netstruct_id from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;gt; b.start_ip and a.ip <b.end_ip ") > &gt; &amp;amp;nbsp; &amp;amp;nbsp; val resRow = table.toRetractStream[Row] > &gt; &amp;nbsp; > &gt; 5 ???????????? > &gt; Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:&amp;amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; LogicalProject(netstruct_id=[$1]) > &gt; &amp;amp;nbsp; LogicalJoin(condition=[AND(&amp;amp;gt;($0, $2), <($0, $3))], joinType=[left]) > &gt; &amp;amp;nbsp; &amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[1], fields=[ip]) > &gt; &amp;amp;nbsp; &amp;amp;nbsp; FlinkLogicalDataStreamScan(id=[2], fields=[netstruct_id, start_ip, end_ip]) > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; This exception indicates that the query uses an unsupported SQL feature. > &gt; Please check the documentation for the set of currently supported SQL features. > &gt; at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245) > &gt; at org.apache.flink.table.plan.Optimizer.optimizeLogicalPlan(Optimizer.scala:160) > &gt; at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:66) > &gt; at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) > &gt; at org.apache.flink.table.planner.StreamPlanner.org$apache$flink$table$planner$StreamPlanner$$translate(StreamPlanner.scala:182) > &gt; at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > &gt; at org.apache.flink.table.planner.StreamPlanner$$anonfun$translate$1.apply(StreamPlanner.scala:127) > &gt; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > &gt; at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > &gt; at scala.collection.Iterator$class.foreach(Iterator.scala:891) > &gt; at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > &gt; at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > &gt; at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > &gt; at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > &gt; at scala.collection.AbstractTraversable.map(Traversable.scala:104) > &gt; at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) > &gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:201) > &gt; at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:124) > &gt; at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146) > &gt; at com.jwell56.linkstarck.LInkStream$.main(LInkStream.scala:37) > &gt; at com.jwell56.linkstarck.LInkStream.main(LInkStream.scala) > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; 6 ??????????????&amp;amp;nbsp; > &gt; select b.netstruct_id from&amp;amp;nbsp; OMstream as&amp;amp;nbsp; a left join sqlStream as b on a.ip &amp;amp;gt; b.start_ip > &gt; ??????????????????????????????&amp;amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; ?????? > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; ------------------&amp;amp;nbsp;????????&amp;amp;nbsp;------------------ > &gt; ??????:&amp;amp;nbsp;"Benchao Li"<libenc...@apache.org <mailto:libenc...@apache.org>&amp;amp;gt;; > &gt; ????????:&amp;amp;nbsp;2020??6??9??(??????) ????4:37 > &gt; ??????:&amp;amp;nbsp;"user-zh"<user-zh@flink.apache.org <mailto:user-zh@flink.apache.org>&amp;amp;gt;; > &gt; &amp;nbsp; > &gt; ????:&amp;amp;nbsp;Re: ????flinksql between???? > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; &amp;nbsp; > &gt; ???????????????????????? > &gt; 1. ????????Flink???????? > &gt; 2. ??????planner????blink planner????old planner?? > &gt; 3. ??????streaming mode????batch mode?? > &gt; 4. ?????????????????????? > &gt; &amp;nbsp; > &gt; ?????? <932460...@qq.com&amp;amp;gt; ??2020??6??9?????? ????4:26?????? > &gt; &amp;nbsp; > &gt; &amp;amp;gt; hi??????flinksql?????? select * from a join b on a.ip <b.startip and a.ip > &gt; &amp;amp;gt; &amp;amp;amp;gt;b.endip ???????????? ???????????? ??????????????????between??????????????