Re: 维表实现无法加载配置文件

2020-08-04 文章
可以用一个静态类加载资源,然后返回一个属性对象properties。

> 在 2020年8月4日,下午4:55,"guaishushu1...@163.com"  写道:
> 
> 维表的Function是集成TableFunction,这样就没办法加载配置文件,大佬们有没有什么好的方式呀???
> 
> 
> 
> guaishushu1...@163.com


Re: Flink sql 转义字符问题

2020-07-31 文章
加反斜杠就可以。\;  只不过分号应该不是特殊字符吧。

> 在 2020年7月31日,下午8:13,zilong xiao  写道:
> 
> SPLIT_INDEX(${xxx}, ';',
> 0),想从字符串中按分号切割,可是分号应该是特殊字符,语法检查总是不能通过,网上查说是可以转义,但是也没太搞懂怎么才能转义,有遇到过类似问题的大佬求指点~~


Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 文章
改成update模式,然后也可以修改唯一主键为自然键

> 在 2020年7月31日,下午4:13,chenxuying  写道:
> 
> hi
> 我使用的flink 1.11.0版本
> 代码如下
> StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
> tableEnvironment.executeSql(" " +
> " CREATE TABLE mySource ( " +
> "  a bigint, " +
> "  b bigint " +
> " ) WITH ( " +
> "  'connector.type' = 'kafka', " +
> "  'connector.version' = 'universal', " +
> "  'connector.topic' = 'mytesttopic', " +
> "  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
> "  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
> "  'connector.properties.group.id' = 'flink-test-cxy', " +
> "  'connector.startup-mode' = 'latest-offset', " +
> "  'format.type' = 'json' " +
> " ) ");
> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
> " id bigint, " +
> "  game_id varchar, " +
> "  PRIMARY KEY (id) NOT ENFORCED  " +
> " )  " +
> " with ( " +
> "  'connector.type' = 'jdbc',   " +
> "  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , 
> " +
> "  'connector.username' = 'root' , " +
> "  'connector.password' = 'root',  " +
> "  'connector.table' = 'mysqlsink' , " +
> "  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
> "  'connector.write.flush.interval' = '2s',  " +
> "  'connector.write.flush.max-rows' = '300'  " +
> " )");
> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
> (select a,cast(b as varchar) b from mySource)");
> 
> 
> 问题一 : 上面的insert语句会出现如下错误
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot 
> apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY( A, VARCHAR(2147483647) B)>)'. Supported form(s): 
> '$SCALAR_QUERY()'
> 
> 
> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
> a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 
> '1' for key 'PRIMARY'
> 
> 
> 



Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-06-16 文章
频繁是什么级别的?可以加缓存。然后再定期更新。

> 在 2020年6月16日,下午10:10,wangxiangyan  写道:
> 
> hi,大家
> 维表被频繁更新,数据量1g左右,需要频繁同步,使用什么方案去关联比较好呢?


Re: flink sql Temporal table join failed

2020-06-12 文章
需要使用Proctime才可以关联,参考:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
 

> 在 2020年6月12日,下午2:24,Zhou Zach  写道:
> 
> flink 1.10.0:
> 在create table中,加PROCTIME() AS proctime字段报错
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> 在 2020-06-12 14:08:11,"Benchao Li"  写道:
>> Hi,
>> 
>> Temporal Table join的时候需要是处理时间,你现在这个b.`time`是一个普通的时间戳,而不是事件时间。
>> 可以参考下[1]
>> 
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html
>> 
>> Zhou Zach  于2020年6月12日周五 下午1:33写道:
>> 
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> 
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> 
>>> SLF4J: Found binding in
>>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> 
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> 
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>> 
>>> ERROR StatusLogger No log4j2 configuration file found. Using default
>>> configuration: logging only errors to the console.
>>> 
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Cannot generate a valid execution plan for the given query:
>>> 
>>> 
>>> 
>>> 
>>> FlinkLogicalSink(name=[`default_catalog`.`default_database`.`user_cnt`],
>>> fields=[time, sum_age])
>>> 
>>> +- FlinkLogicalCalc(select=[CAST(time) AS EXPR$0, age])
>>> 
>>>   +- FlinkLogicalJoin(condition=[=($0, $2)], joinType=[inner])
>>> 
>>>  :- FlinkLogicalCalc(select=[uid, time])
>>> 
>>>  :  +- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, user_behavior, source: [KafkaTableSource(uid, phoneType,
>>> clickCount, time)]]], fields=[uid, phoneType, clickCount, time])
>>> 
>>>  +- FlinkLogicalSnapshot(period=[$cor0.time])
>>> 
>>> +- FlinkLogicalCalc(select=[uid, age])
>>> 
>>>+- FlinkLogicalTableSourceScan(table=[[default_catalog,
>>> default_database, users, source: [MysqlAsyncLookupTableSource(uid, sex,
>>> age, created_time)]]], fields=[uid, sex, age, created_time])
>>> 
>>> 
>>> 
>>> 
>>> Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left
>>> table's proctime field, doesn't support 'PROCTIME()'
>>> 
>>> Please check the documentation for the set of currently supported SQL
>>> features.
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>>> 
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>> 
>>> at
>>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>>> 
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>> 
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>> 
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> 
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> 
>>> at
>>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>>> 
>>> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:170)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:90)
>>> 
>>> at
>>> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>>> 
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>>> 
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>>> 
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>> 
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>> 
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal$.main(FromKafkaSinkMysqlForReal.scala:90)
>>> 
>>> at
>>> org.rabbit.sql.FromKafkaSinkMysqlForReal.main(FromKafkaSinkMysqlForReal.scala)
>>> 
>>> Caused by: org.apache.flink.table.api.TableException: Temporal table join
>>> currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-06-10 文章
哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。

> 在 2020年6月10日,下午1:24,Zhefu PENG  写道:
> 
> 补充一下,在TaskManager发现了如下错误日志:
> 
> 2020-06-10 12:44:40,688 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> during disposal of stream operator.
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka: Pending record count must be zero at this point: 5
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Pending record count must be
> zero at this point: 5
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> ... 8 more
> 
> 希望得到帮助,感谢!
> 
> 
> Zhefu PENG  于2020年6月10日周三 下午1:03写道:
> 
>> Hi all,
>> 
>> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
>> Source: Custom Source -> Map -> Source_Map -> Empty_Filer -> Field_Filter
>> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
>> 
>> 
>> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
>> 
>> 部分报错信息如下:
>> 2020-06-10 12:02:49,083 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 1 @ 1591761769060 for job c41f4811262db1c4c270b136571c8201.
>> 2020-06-10 12:04:47,898 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline
>> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
>> c41f4811262db1c4c270b136571c8201 at
>> container_e27_1591466310139_21670_01_06 @
>> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
>> 2020-06-10 12:04:47,899 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
>> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> complete snapshot 1 for operator Source: Custom Source -> Map -> Source_Map
>> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map -> Map
>> -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was declined.
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>> at 

Re: sink mysql 失败

2020-06-10 文章
用户名密码没有设置。

> 在 2020年6月10日,下午5:42,Zhou Zach  写道:
> 
> 感谢回复!忘记设置用户名和密码了。。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> At 2020-06-10 16:54:43, "wangweigu...@stevegame.cn" 
>  wrote:
>> 
>> Caused by: java.sql.SQLException: Access denied for user ''@'localhost' 
>> (using password: NO)
>> 得指定下有操作mysql这个表的权限账号了!
>> 
>> 
>> 
>> 发件人: Zhou Zach
>> 发送时间: 2020-06-10 16:32
>> 收件人: Flink user-zh mailing list
>> 主题: sink mysql 失败
>> SLF4J: Class path contains multiple SLF4J bindings.
>> 
>> SLF4J: Found binding in 
>> [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> 
>> SLF4J: Found binding in 
>> [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> 
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>> explanation.
>> 
>> SLF4J: Actual binding is of type 
>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>> 
>> ERROR StatusLogger No log4j2 configuration file found. Using default 
>> configuration: logging only errors to the console.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 
>> verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without 
>> server's identity verification is not recommended. According to MySQL 
>> 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established 
>> by default if explicit option isn't set. For compliance with existing 
>> applications not using SSL the verifyServerCertificate property is set to 
>> 'false'. You need either to explicitly disable SSL by setting useSSL=false, 
>> or set useSSL=true and provide truststore for server certificate 

Re: 流groupby

2020-06-09 文章
可以不设置窗口,直接用你的字段,我们就有很多没有窗口的业务场景,但是这样会比较耗内存。如果时间太长,也可能导致oom

> 在 2020年6月9日,下午12:24,allanqinjy  写道:
> 
> hi,
>   也就是指定 update-model retract就可以了是吧?好的多谢,我试试!
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> 在 2020-06-09 12:13:10,"1048262223" <1048262...@qq.com> 写道:
>> Hi
>> 
>> 
>> 可以不开窗口只不过结果是retract流而不是append流
>> 
>> 
>> Best,
>> Yichao Yang
>> 
>> 
>> 
>> 
>> 
>> 发自我的iPhone
>> 
>> 
>> -- 原始邮件 --
>> 发件人: allanqinjy > 发送时间: 2020年6月9日 12:11
>> 收件人: user-zh > 主题: 回复:流groupby