Re: 维表实现无法加载配置文件
可以用一个静态类加载资源,然后返回一个属性对象properties。 > 在 2020年8月4日,下午4:55,"guaishushu1...@163.com" 写道: > > 维表的Function是集成TableFunction,这样就没办法加载配置文件,大佬们有没有什么好的方式呀??? > > > > guaishushu1...@163.com
Re: Flink sql 转义字符问题
加反斜杠就可以。\; 只不过分号应该不是特殊字符吧。 > 在 2020年7月31日,下午8:13,zilong xiao 写道: > > SPLIT_INDEX(${xxx}, ';', > 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~
Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题
改成update模式,然后也可以修改唯一主键为自然键 > 在 2020年7月31日,下午4:13,chenxuying 写道: > > hi > 我使用的flink 1.11.0版本 > 代码如下 > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv); > tableEnvironment.executeSql(" " + > " CREATE TABLE mySource ( " + > " a bigint, " + > " b bigint " + > " ) WITH ( " + > " 'connector.type' = 'kafka', " + > " 'connector.version' = 'universal', " + > " 'connector.topic' = 'mytesttopic', " + > " 'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " + > " 'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " + > " 'connector.properties.group.id' = 'flink-test-cxy', " + > " 'connector.startup-mode' = 'latest-offset', " + > " 'format.type' = 'json' " + > " ) "); > tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " + > " id bigint, " + > " game_id varchar, " + > " PRIMARY KEY (id) NOT ENFORCED " + > " ) " + > " with ( " + > " 'connector.type' = 'jdbc', " + > " 'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , > " + > " 'connector.username' = 'root' , " + > " 'connector.password' = 'root', " + > " 'connector.table' = 'mysqlsink' , " + > " 'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " + > " 'connector.write.flush.interval' = '2s', " + > " 'connector.write.flush.max-rows' = '300' " + > " )"); > tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values > (select a,cast(b as varchar) b from mySource)"); > > > 问题一 : 上面的insert语句会出现如下错误 > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot > apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY( A, VARCHAR(2147483647) B)>)'. Supported form(s): > '$SCALAR_QUERY()' > > > 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select > a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错 > Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry > '1' for key 'PRIMARY' > > >
Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性
频繁是什么级别的?可以加缓存。然后再定期更新。 > 在 2020年6月16日,下午10:10,wangxiangyan 写道: > > hi,大家 > 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?
Re: flink sql Temporal table join failed
需要使用Proctime才可以关联,参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html > 在 2020年6月12日,下午2:24,Zhou Zach 写道: > > flink 1.10.0: > 在create table中,加PROCTIME() AS proctime字段报错 > > > > > > > > > > > > > > > > > >> 在 2020-06-12 14:08:11,"Benchao Li" 写道: >> Hi, >> >> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。 >> 可以参考下[1] >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html >> >> Zhou Zach 于2020年6月12日周五 下午1:33写道: >> >>> SLF4J: Class path contains multiple SLF4J bindings. >>> >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> >>> SLF4J: Found binding in >>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >>> >>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >>> explanation. >>> >>> SLF4J: Actual binding is of type >>> [org.apache.logging.slf4j.Log4jLoggerFactory] >>> >>> ERROR StatusLogger No log4j2 configuration file found. Using default >>> configuration: logging only errors to the console. >>> >>> Exception in thread "main" org.apache.flink.table.api.TableException: >>> Cannot generate a valid execution plan for the given query: >>> >>> >>> >>> >>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`], >>> fields=[time, sum_age]) >>> >>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age]) >>> >>> +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner]) >>> >>> :- FlinkLogicalCalc(select=[uid, time]) >>> >>> : +- FlinkLogicalTableSourceScan(table=[[default_catalog, >>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType, >>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time]) >>> >>> +- FlinkLogicalSnapshot(period=[$cor0.time]) >>> >>> +- FlinkLogicalCalc(select=[uid, age]) >>> >>>+- FlinkLogicalTableSourceScan(table=[[default_catalog, >>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex, >>> age, created_time)]]], fields=[uid, sex, age, created_time]) >>> >>> >>> >>> >>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left >>> table's proctime field, doesn't support 'PROCTIME()' >>> >>> Please check the documentation for the set of currently supported SQL >>> features. >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >>> >>> at >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>> >>> at >>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >>> >>> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >>> >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >>> >>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>> >>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>> >>> at >>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >>> >>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90) >>> >>> at >>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >>> >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248) >>> >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151) >>> >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) >>> >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) >>> >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90) >>> >>> at >>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala) >>> >>> Caused by: org.apache.flink.table.api.TableException: Temporal table join >>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime
Re: flink任务checkpoint无法完成snapshot,且报kafka异常
哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。 > 在 2020年6月10日,下午1:24,Zhefu PENG 写道: > > 补充一下,在TaskManager发现了如下错误日志: > > 2020-06-10 12:44:40,688 ERROR > org.apache.flink.streaming.runtime.tasks.StreamTask - Error > during disposal of stream operator. > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to > send data to Kafka: Pending record count must be zero at this point: 5 > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861) > at > org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalStateException: Pending record count must be > zero at this point: 5 > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834) > ... 8 more > > 希望得到帮助,感谢! > > > Zhefu PENG 于2020年6月10日周三 下午1:03写道: > >> Hi all, >> >> 现在有一个简单的flink任务,大概chain在一起后的执行图为: >> Source: Custom Source -> Map -> Source_Map -> Empty_Filer -> Field_Filter >> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed >> >> >> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。 >> >> 部分报错信息如下: >> 2020-06-10 12:02:49,083 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201. >> 2020-06-10 12:04:47,898 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline >> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job >> c41f4811262db1c4c270b136571c8201 at >> container_e27_1591466310139_21670_01_06 @ >> hdp1-hadoop-datanode-4.novalocal (dataPort=44778). >> 2020-06-10 12:04:47,899 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding >> checkpoint 1 of job c41f4811262db1c4c270b136571c8201. >> org.apache.flink.runtime.checkpoint.CheckpointException: Could not >> complete snapshot 1 for operator Source: Custom Source -> Map -> Source_Map >> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map -> Map >> -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was declined. >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) >> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) >> at
Re: sink mysql 失败
用户名密码没有设置。 > 在 2020年6月10日,下午5:42,Zhou Zach 写道: > > 感谢回复!忘记设置用户名和密码了。。 > > > > > > > > > > > > > > > > > > At 2020-06-10 16:54:43, "wangweigu...@stevegame.cn" > wrote: >> >> Caused by: java.sql.SQLException: Access denied for user ''@'localhost' >> (using password: NO) >> 得指定下有操作mysql这个表的权限账号了! >> >> >> >> 发件人: Zhou Zach >> 发送时间: 2020-06-10 16:32 >> 收件人: Flink user-zh mailing list >> 主题: sink mysql 失败 >> SLF4J: Class path contains multiple SLF4J bindings. >> >> SLF4J: Found binding in >> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> >> SLF4J: Found binding in >> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] >> >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an >> explanation. >> >> SLF4J: Actual binding is of type >> [org.apache.logging.slf4j.Log4jLoggerFactory] >> >> ERROR StatusLogger No log4j2 configuration file found. Using default >> configuration: logging only errors to the console. >> >> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without >> server's identity verification is not recommended. According to MySQL >> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established >> by default if explicit option isn't set. For compliance with existing >> applications not using SSL the verifyServerCertificate property is set to >> 'false'. You need either to explicitly disable SSL by setting useSSL=false, >> or set useSSL=true and provide truststore for server certificate >> verification. >> >> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without >> server's identity verification is not recommended. According to MySQL >> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established >> by default if explicit option isn't set. For compliance with existing >> applications not using SSL the verifyServerCertificate property is set to >> 'false'. You need either to explicitly disable SSL by setting useSSL=false, >> or set useSSL=true and provide truststore for server certificate >> verification. >> >> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without >> server's identity verification is not recommended. According to MySQL >> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established >> by default if explicit option isn't set. For compliance with existing >> applications not using SSL the verifyServerCertificate property is set to >> 'false'. You need either to explicitly disable SSL by setting useSSL=false, >> or set useSSL=true and provide truststore for server certificate >> verification. >> >> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without >> server's identity verification is not recommended. According to MySQL >> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established >> by default if explicit option isn't set. For compliance with existing >> applications not using SSL the verifyServerCertificate property is set to >> 'false'. You need either to explicitly disable SSL by setting useSSL=false, >> or set useSSL=true and provide truststore for server certificate >> verification. >> >> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without >> server's identity verification is not recommended. According to MySQL >> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established >> by default if explicit option isn't set. For compliance with existing >> applications not using SSL the verifyServerCertificate property is set to >> 'false'. You need either to explicitly disable SSL by setting useSSL=false, >> or set useSSL=true and provide truststore for server certificate >> verification. >> >> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without >> server's identity verification is not recommended. According to MySQL >> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established >> by default if explicit option isn't set. For compliance with existing >> applications not using SSL the verifyServerCertificate property is set to >> 'false'. You need either to explicitly disable SSL by setting useSSL=false, >> or set useSSL=true and provide truststore for server certificate >> verification. >> >> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without >> server's identity verification is not recommended. According to MySQL >> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established >> by default if explicit option isn't set. For compliance with existing >> applications not using SSL the verifyServerCertificate property is set to >> 'false'. You need either to explicitly disable SSL by setting useSSL=false, >> or set useSSL=true and provide truststore for server certificate
Re: 流groupby
可以不设置窗口,直接用你的字段,我们就有很多没有窗口的业务场景,但是这样会比较耗内存。如果时间太长,也可能导致oom > 在 2020年6月9日,下午12:24,allanqinjy 写道: > > hi, > 也就是指定 update-model retract就可以了是吧?好的多谢,我试试! > > > > > > > > > > > > > > > > > >> 在 2020-06-09 12:13:10,"1048262223" <1048262...@qq.com> 写道: >> Hi >> >> >> 可以不开窗口只不过结果是retract流而不是append流 >> >> >> Best, >> Yichao Yang >> >> >> >> >> >> 发自我的iPhone >> >> >> -- 原始邮件 -- >> 发件人: allanqinjy > 发送时间: 2020年6月9日 12:11 >> 收件人: user-zh > 主题: 回复:流groupby