回复:请问一下 :启动session 时 ,报错:Could not write the Yarn connection information.
请问你用的flink版本是? thanks, godfrey -- 发件人:李军 发送时间:2019年11月22日(星期五) 11:10 收件人:user-zh 主 题:请问一下 :启动session 时 ,报错:Could not write the Yarn connection information. 报错图片和详细内容如下 https://blog.csdn.net/qq_37518574/article/details/103197224 另外启动这个之前要启动哪些,yarn 和hdfs 都已经启动了; 初学,虚心请教,感谢。
请问一下 :启动session 时 ,报错:Could not write the Yarn connection information.
报错图片和详细内容如下 https://blog.csdn.net/qq_37518574/article/details/103197224 另外启动这个之前要启动哪些,yarn 和hdfs 都已经启动了; 初学,虚心请教,感谢。
Re: 关于flink1.7.2checkpoint本地文件存储,设置文件个数的问题
Hi 每个 Checkpoint 生成多少文件,由作业决定,这个不需要用户自己手动设置。 Best, Congxian Godfrey Johnson 于2019年11月22日周五 上午9:26写道: > 不太明白你说的是什么文件个数。 > > 如果是CKP的保留个数,可以参考: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained > > > 2019年11月21日 下午10:41,?青春狂-^ 写道: > > > > Hi > > 关于flink的checkpoint存储的设置有几种方式,mem,hdfs,rocksdb,本地文件 > > > 我选择并配置了本地文件的方式,官方文档中写明了有checkpoint个数的相关配置,但没有找到文件个数的配置,请问这块有文件个数的相关配置么? > >
回复:DML去重,translate时报错
Hi JingsongLee, 晓令: 谢谢你们的答疑。 备注issue链接:https://issues.apache.org/jira/projects/FLINK/issues/FLINK-14899?filter=allissues | | 叶贤勋 | | yxx_c...@163.com | 签名由网易邮箱大师定制 在2019年11月21日 22:00,贺小令(晓令) 写道: hi 叶贤勋: 你的SQL里先 ORDER BY proc desc ,然后取 rownum = 1, 等价于 last row 的逻辑。此时会产生 retraction,但是结果表(user_dist) 没有定义 pk 信息,此时是不支持的,即报你看到的错误。 如果将 ORDER BY proc desc 改为 ORDER BY proc asc,加上 rownum = 1,等价于 first row 的逻辑,不会产生retraction,此时的结果表(user_dist) 是可以满足要求的。 但是 blink planner 目前处理 PROCTIME() 有问题,sql 优化过程将 PROCTIME() 属性丢掉了,被认为只是一个普通的 timestamp 类型,不会翻译成 first row 的逻辑。我建了一个 issue 来 fix 这个问题。 thanks, godfrey -- 发件人:JingsongLee 发送时间:2019年11月21日(星期四) 18:44 收件人:user-zh ; Jark Wu ; godfrey he (JIRA) 主 题:Re: DML去重,translate时报错 Hi 叶贤勋: 现在去重现在支持insert into select 语法。 问题在于你的这个SQL怎么没产出UniqueKey 这里面可能有blink-planner的bug。 CC: @Jark Wu @godfrey he (JIRA) Best, Jingsong Lee -- From:叶贤勋 Send Time:2019年11月21日(星期四) 16:20 To:user-zh@flink.apache.org Subject:DML去重,translate时报错 Hi 大家好: Flink版本1.9.0, SQL1: CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); SQL2: CREATE TABLE user_dist ( dt VARCHAR, user_id VARCHAR, behavior VARCHAR ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'user_behavior_dup', 'connector.username' = 'root', 'connector.password' = ‘**', 'connector.write.flush.max-rows' = '1' ); SQL3: INSERT INTO user_dist SELECT dt, user_id, behavior FROM ( SELECT dt, user_id, behavior, ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) AS rownum FROM (select DATE_FORMAT(ts, '-MM-dd HH:00') as dt,user_id,behavior,PROCTIME() as proc from user_log) ) WHERE rownum = 1; 在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) 请问去重现在不支持insert into select 语法吗? | | 叶贤勋 | | yxx_c...@163.com | 签名由网易邮箱大师定制
Re: 关于flink1.7.2checkpoint本地文件存储,设置文件个数的问题
不太明白你说的是什么文件个数。 如果是CKP的保留个数,可以参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained > 2019年11月21日 下午10:41,?青春狂-^ 写道: > > Hi > 关于flink的checkpoint存储的设置有几种方式,mem,hdfs,rocksdb,本地文件 > > 我选择并配置了本地文件的方式,官方文档中写明了有checkpoint个数的相关配置,但没有找到文件个数的配置,请问这块有文件个数的相关配置么?
????flink1.7.2checkpoint????????????????????????????????
Hi flink??checkpoint??mem??hdfs??rocksdb?? checkpoint?
Re: Error:java: 无效的标记: --add-exports=java.base/sun.net.util=ALL-UNNAMED
Thank you very much. It works for me. > 在 2019年11月14日,下午1:06,Biao Liu 写道: > > Hi, > > I have encountered the same issue when setting up a dev environment. > > It seems that the my Intellij (2019.2.1) unexpectedly activates java11 > profile of maven. It doesn't match the Java compiler (JDK8). I'm not sure > why it happened silently. > > So for me, the solution is "Intellij" -> "View" -> "Tool Windows" -> > "Maven" -> "Profiles" -> uncheck the "java11" -> reimport maven project. > > Thanks, > Biao /'bɪ.aʊ/ > > > > On Mon, 4 Nov 2019 at 18:01, OpenInx wrote: > >> Hi >> I met the same problem before. After some digging, I find that the idea >> will detect the JDK version >> and choose whether to use the jdk11 option to run the flink maven building. >> if you are in jdk11 env, then >> it will add the option --add-exports when maven building in IDEA. >> >> For my case, I was in IntelliJIdea2019.2 which depends on the jdk11, and >> once I re-import the flink >> modules then the IDEA will add the --add-exports flag even if I removed >> all the flags in .idea/compile.xml >> explicitly. I noticed that the Intellij's JDK affected the flink maven >> building, so I turned to use the Intellij with JDK8 >> bundled, then the problem was gone. >> >> You can verify it, and if it's really the same. can just replace your IDEA >> with the pkg suffix with "with bundled JBR 8" in >> here [1]. >> Say if you are using MacOS, then should download the package "2019.2.4 for >> macOS with bundled JBR 8 (dmg)" >> >> Hope it works for you >> Thanks. >> >> [1]. https://www.jetbrains.com/idea/download/other.html >> >> >> On Mon, Nov 4, 2019 at 5:44 PM Till Rohrmann wrote: >> >>> Try to reimport that maven project. This should resolve this issue. >>> >>> Cheers, >>> Till >>> >>> On Mon, Nov 4, 2019 at 10:34 AM 刘建刚 wrote: >>> Hi, I am using flink 1.9 in idea. But when I run a unit test in >>> idea. The idea reports the following error:"Error:java: 无效的标记: --add-exports=java.base/sun.net.util=ALL-UNNAMED". Everything is ok when I use flink 1.6. I am using jdk 1.8. Is it related to the java version? >>> >>
回复:DML去重,translate时报错
hi 叶贤勋: 你的SQL里先 ORDER BY proc desc ,然后取 rownum = 1, 等价于 last row 的逻辑。此时会产生 retraction,但是结果表(user_dist) 没有定义 pk 信息,此时是不支持的,即报你看到的错误。 如果将 ORDER BY proc desc 改为 ORDER BY proc asc,加上 rownum = 1,等价于 first row 的逻辑,不会产生retraction,此时的结果表(user_dist) 是可以满足要求的。 但是 blink planner 目前处理 PROCTIME() 有问题,sql 优化过程将 PROCTIME() 属性丢掉了,被认为只是一个普通的 timestamp 类型,不会翻译成 first row 的逻辑。我建了一个 issue 来 fix 这个问题。 thanks, godfrey -- 发件人:JingsongLee 发送时间:2019年11月21日(星期四) 18:44 收件人:user-zh ; Jark Wu ; godfrey he (JIRA) 主 题:Re: DML去重,translate时报错 Hi 叶贤勋: 现在去重现在支持insert into select 语法。 问题在于你的这个SQL怎么没产出UniqueKey 这里面可能有blink-planner的bug。 CC: @Jark Wu @godfrey he (JIRA) Best, Jingsong Lee -- From:叶贤勋 Send Time:2019年11月21日(星期四) 16:20 To:user-zh@flink.apache.org Subject:DML去重,translate时报错 Hi 大家好: Flink版本1.9.0, SQL1: CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); SQL2: CREATE TABLE user_dist ( dt VARCHAR, user_id VARCHAR, behavior VARCHAR ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'user_behavior_dup', 'connector.username' = 'root', 'connector.password' = ‘**', 'connector.write.flush.max-rows' = '1' ); SQL3: INSERT INTO user_dist SELECT dt, user_id, behavior FROM ( SELECT dt, user_id, behavior, ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) AS rownum FROM (select DATE_FORMAT(ts, '-MM-dd HH:00') as dt,user_id,behavior,PROCTIME() as proc from user_log) ) WHERE rownum = 1; 在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) 请问去重现在不支持insert into select 语法吗? | | 叶贤勋 | | yxx_c...@163.com | 签名由网易邮箱大师定制
Re: DML去重,translate时报错
Hi 叶贤勋: 现在去重现在支持insert into select 语法。 问题在于你的这个SQL怎么没产出UniqueKey 这里面可能有blink-planner的bug。 CC: @Jark Wu @godfrey he (JIRA) Best, Jingsong Lee -- From:叶贤勋 Send Time:2019年11月21日(星期四) 16:20 To:user-zh@flink.apache.org Subject:DML去重,translate时报错 Hi 大家好: Flink版本1.9.0, SQL1: CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); SQL2: CREATE TABLE user_dist ( dt VARCHAR, user_id VARCHAR, behavior VARCHAR ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'user_behavior_dup', 'connector.username' = 'root', 'connector.password' = ‘**', 'connector.write.flush.max-rows' = '1' ); SQL3: INSERT INTO user_dist SELECT dt, user_id, behavior FROM ( SELECT dt, user_id, behavior, ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) AS rownum FROM (select DATE_FORMAT(ts, '-MM-dd HH:00') as dt,user_id,behavior,PROCTIME() as proc from user_log) ) WHERE rownum = 1; 在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) 请问去重现在不支持insert into select 语法吗? | | 叶贤勋 | | yxx_c...@163.com | 签名由网易邮箱大师定制
DML去重,translate时报错
Hi 大家好: Flink版本1.9.0, SQL1: CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); SQL2: CREATE TABLE user_dist ( dt VARCHAR, user_id VARCHAR, behavior VARCHAR ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', 'connector.table' = 'user_behavior_dup', 'connector.username' = 'root', 'connector.password' = ‘**', 'connector.write.flush.max-rows' = '1' ); SQL3: INSERT INTO user_dist SELECT dt, user_id, behavior FROM ( SELECT dt, user_id, behavior, ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc desc ) AS rownum FROM (select DATE_FORMAT(ts, '-MM-dd HH:00') as dt,user_id,behavior,PROCTIME() as proc from user_log) ) WHERE rownum = 1; 在对SQL3执行tableEnv.sqlUpdate(SQL3)时,报错: Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348) 请问去重现在不支持insert into select 语法吗? | | 叶贤勋 | | yxx_c...@163.com | 签名由网易邮箱大师定制