flink connect kafka

2020-09-17 文章 wch...@163.com
flink版本 1.10.0 没有使用checkpoint Kafka version : 0.10.2.1 数据源为kafka 代码如下: val topicHkList = List(HqKafkaTopic.KAFKA_TOPIC_HK_TRADE_TICKER, HqKafkaTopic.KAFKA_TOPIC_HK_INDEX) val kafkaHkConsumer: FlinkKafkaConsumer[Tuple3[String, String, String]] = new FlinkKafkaConsumer(topicHkList, new

Re: 统计数据含有中间回撤数据的问题

2020-09-17 文章 xushanshan
hi, Jark 开启 minibatch 是将中间数据按一批次处理,如果中间回撤数据和后续的更新数据分到两个minibatch里了,还是不能避免下游系统查询到中间结果的问题 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 统计数据含有中间回撤数据的问题

2020-09-17 文章 Jark Wu
开启 minibatch 可以基本解决中间结果的问题: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation Best, Jark On Fri, 18 Sep 2020 at 11:57, xushanshan <1337220...@qq.com> wrote: > 问题内容已修改补充完成 > > > > -- > Sent from:

Re: 统计数据含有中间回撤数据的问题

2020-09-17 文章 xushanshan
问题内容已修改补充完成 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink sql 消费kafka的顺序是怎么样的 第二次运行sql的结果和第一次不同

2020-09-17 文章 anonnius
hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么 0> mac本地环境 1> flink 1.11.1 2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1 3> 使用的是sql-client.sh 环境 4> 先在sql-cli中创建了iservVisit表 create table iservVisit ( type string comment '时间类型', uuid string comment '用户uri',

????stream??????????????????????????

2020-09-17 文章 ??????
??kafka?? //kafka DataStreamSource

ListState 设置 TTL 会在 list 中删除之前的记录吗

2020-09-17 文章 wangl...@geekplus.com
考虑下面的场景: KeyBy userId, 把该 userId 所用的相关记录存起来放在 ListState 中 private transient ListState list; @Override public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception { list.add(value.f1); } TTL 设为 7 天。 如果这个 userId 超过 7 天没有任何消息,那这个 userId 相应的 ListState 会被删除。 但如果这

Re: 多线程模式下使用Blink TableEnvironment

2020-09-17 文章 jun su
hi godfrey, 我们的用法类似zeppelin, 项目形式类似notebook, 在第一次运行笔记时创建env, 再次运行notebook时会创建新线程来构建job运行, 所以我参考了zepplin的做法暂时fix了这个问题 godfrey he 于2020年9月17日周四 下午10:07写道: > TableEnvironment 不是多线程安全的。 > > btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗? > > Jeff Zhang 于2020年9月14日周一 下午12:10写道: > > >

Re: Flink 1.11 jdbc查pg失败

2020-09-17 文章 Jark Wu
能贴下你的 DDL 和 query 吗? 你可以试试用反引号, select `F1`, `F2` from xxx; Best, Jark On Thu, 17 Sep 2020 at 23:28, godfrey he wrote: > 据我所知,目前flink是大小写不敏感,但是pg是大小写敏感。这问题暂时没法解 > > wdmcode 于2020年9月10日周四 上午9:44写道: > > > Hi Jimmy > > > > 给字段加双引号试试呢 > > Select “F1”,”F2” from xxx.xxx; > > > > > > 发件人: Jimmy

Re:Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-17 文章 kandy.wang
@Jingsong Li 测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。 我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter 修改应该都提交到flink 1.11分支了吧。 顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ? 在 2020-09-17 14:19:42,"Jingsong Li" 写道: >是的,可以测一下,理论上 mr writer不应该有较大性能差距。 >

Re: Flink 1.11 jdbc查pg失败

2020-09-17 文章 godfrey he
据我所知,目前flink是大小写不敏感,但是pg是大小写敏感。这问题暂时没法解 wdmcode 于2020年9月10日周四 上午9:44写道: > Hi Jimmy > > 给字段加双引号试试呢 > Select “F1”,”F2” from xxx.xxx; > > > 发件人: Jimmy Zhang > 发送时间: Thursday, September 10, 2020 9:41 AM > 收件人: user-zh@flink.apache.org > 主题: Flink 1.11 jdbc查pg失败 > > flink 1.11用jdbc查询pg表时,pg表的字段是大写

Re: flink sql执行sql语句无法执行的错误-No operators defined in streaming topology. Cannot execute.

2020-09-17 文章 godfrey he
能提供完整的demo吗? me 于2020年9月11日周五 下午6:54写道: > 1.flink 版本是1.11.1 > streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > streamBlinkSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > streamTableEnv = StreamTableEnvironment.create(streamEnv, >

Re: 多线程模式下使用Blink TableEnvironment

2020-09-17 文章 godfrey he
TableEnvironment 不是多线程安全的。 btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗? Jeff Zhang 于2020年9月14日周一 下午12:10写道: > 参考zeppelin的做法,每个线程里都调用这个 > > > https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111 > > > jun su

Re: flink hive批量作业报FileNotFoundException

2020-09-17 文章 godfrey he
cc @Rui Li 李佳宸 于2020年9月14日周一 下午5:11写道: > 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件 > 版本是1.11.1 > Caused by: java.io.FileNotFoundException: File > > hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144 > does

Re: 关于flink cdc 测试时遇到的几种问题,比较疑惑,各位大佬求帮助

2020-09-17 文章 godfrey he
sql client的默认并发为1,如果没有在sql-client-defaults.yaml显示设置parallelism,代码里面的默认并发为1.因此需要显示的设置 sql-client-defaults.yaml的parallelism Jark Wu 于2020年9月15日周二 上午11:43写道: > Hi, > > 请问 > 1. 有完整的异常栈吗? 你是怎么从 ck 恢复的呢? 用的什么命令? > 2. 是的。因为 source 只能并发1。先写到 kafka,再从 kafka 同步是可以的。 > > Best, > Jark > > On Fri, 11 Sep

Re: Flink SQL create view问题

2020-09-17 文章 godfrey he
已知问题,已fix:https://issues.apache.org/jira/browse/FLINK-18750 guaishushu1...@163.com 于2020年9月16日周三 下午2:32写道: > 当create_view和LATERAL TABLE 共用时 会出现字段找不到异常 > > 语法: > CREATE TABLE billing_data_test ( > message STRING > > > create view v1 as > select T.* > from billing_data_test, > LATERAL

Re: Flink SQL TableSource复用问题,相同数据源聚合多个指标,引擎创建多个相同的数据源

2020-09-17 文章 godfrey he
blink 根据每个算子的digest信息来判断是否可以reuse(只有digest完全一样才可以reuse), 例如table source节点,算子信息包括:表名,select的字段信息,其他push down之后的信息等。 你可以通过explain的方式把plan打印出来看看,source的digest是否一样 Jingsong Li 于2020年9月17日周四 下午2:45写道: > 你仔细看看这两个数据源是不是有什么不同 > 只要有一点不同,Blink 就 reuse 不了 > > On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai

Re: flink 1.9 关于回撤流的问题

2020-09-17 文章 godfrey he
可以用flink提供的“去重"语法来支持 [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D Shengkai Fang 于2020年9月15日周二 下午4:02写道: > hi, 我对于使用upsert > > kafka能够省state感到疑惑。金竹老师提供的实现只是丢掉了delete消息,你的下游表依旧需要手动去重才可以得到准确的结果才对啊。如果每个下游表都手动去重这样子还能省state吗? > > star

Re: 使用flinksql时 jdbc connector参数不起作用

2020-09-17 文章 Jark Wu
> sink.buffer-flush.max-rows = '0' 导致每接收一条数据就插入数据库 这个应该是个 bug,我建了个 issue:https://issues.apache.org/jira/browse/FLINK-19280 Best, Jark On Thu, 17 Sep 2020 at 18:15, chenxuying wrote: > 环境是flink1.11.2+idea > sql: > CREATE TABLE sourceTable ( > platform STRING > ,game_id bigint > ) WITH

Re: K8s native 部署失败

2020-09-17 文章 Yang Wang
从你发的报错栈来看TM是用的ip地址去连的,正常如果是非HA的话,应该是通过service来连接的 因为JM在非HA情况下rpc地址是bind到service上的 你是否有对Flink的代码做修改呢,或者用native模式起来以后,修改过ConfigMap等 Best, Yang yanzhibo 于2020年9月17日周四 下午3:55写道: > 是非ha,所有tm都注册不上来,但是在tm的pod中 根据service 是可以ping 通 jobmanager的 > > > > 2020年9月17日 上午11:10,Yang Wang 写道: > > > >

使用flinksql时 jdbc connector参数不起作用

2020-09-17 文章 chenxuying
环境是flink1.11.2+idea sql: CREATE TABLE sourceTable ( platform STRING ,game_id bigint ) WITH ( ... ); CREATE TABLE sinktable ( platform STRING ,game_id bigint ) WITH ( 'connector' = 'jdbc', 'url' = '', 'table-name' = '', 'driver' = 'com.mysql.jdbc.Driver',

Re: 关于官方的k8s operator

2020-09-17 文章 Harold.Miao
谢谢 Yang Wang 于2020年9月17日周四 上午11:20写道: > Flink官方其实是没有开发K8s Operator的,目前使用比较多的有lyft[1]和google[2]开发的两个 > 都已经在生产环境使用了,支持的都是Standalone job/application on K8s,还不支持native[3]的集成 > > 如果你想自己实现一个K8s Operator支持native模式的话,我之前做过一个POC,你可以参考一下[4] > > > [1]. https://github.com/lyft/flinkk8soperator > [2].

关于checkpointFunction接口

2020-09-17 文章 smq
大佬好,现在有个疑问,因为要用到自定义的state 保存某个值,这个状态是在keydprocessFunction实现类中用到的,我在sink的时候,实现了CheckpointFunction接口,实现了snapshotState方法,在这个方法中,我只写了一个flush到kudu的方法。这样的话我之前自己定义的state能保存到状态后端吗。 换句话说,就是snapshotState这个方法是在做checkpoint的同时调用了这个方法中的动作,还是说其他的状态不做了,只做我实现的snapshotState这个方法呢?

Re: 请问flink jdbc connector 支持greenplum吗

2020-09-17 文章 Jark Wu
官方的暂不支持。 需要自己开发JdbcDialect插件。 On Thu, 17 Sep 2020 at 13:59, xuzh wrote: > 请问flink jdbc connector 支持greenplum吗,还是要另外自己写插件

Per-job mode 任务失败 jm没有退出

2020-09-17 文章 Qishang
Flink 1.11.1 CDH 5.15.2 提交命令:/opt/flink-1.11.1/bin/flink run -m yarn-cluster -p 2 -yjm 1024m -ytm 2048m -ynm job_sync -c com.qcc.hive.TidbBinlogSyncHive /tmp/flink-binlog-sync-hive-1.0-SNAPSHOT.jar flink-conf.yaml 重启策策略 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 5

Re: K8s native 部署失败

2020-09-17 文章 yanzhibo
是非ha,所有tm都注册不上来,但是在tm的pod中 根据service 是可以ping 通 jobmanager的 > 2020年9月17日 上午11:10,Yang Wang 写道: > > 你这个报错看着是TM向JM注册超时了,使用的HA还是非HA部署呢 > > 如果是HA的话,TM是直接使用JM的Pod ip进行通信的,这个时候需要登录pod确认一下网络是否是通的 > 如果是非HA的话,TM是使用service来向JM注册,你需要检查一下K8s的kube proxy是否正常 > > 另外,是所有TM都注册不上来,还是只有个别的。这个也可以排除网络问题 > > >

Re: python udf 提交到本地节点执行报错

2020-09-17 文章 myfjdthink
感谢搞定了,根据你提供的文档,我把命令改为 flink run -py src/etl/hello_world.py -pyexec /usr/local/opt/python@3.7/bin/python3 指定了 python 执行器就行了 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: python udf 提交到本地节点执行报错

2020-09-17 文章 Xingbo Huang
Hi, 你可以参考文档[1],里面的api set_python_executable(python_exec)用来设置你的Python环境的,然后你需要确保你这个python环境有安装pyflink。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/dependency_management.html#python-dependency Best, Xingbo myfjdthink 于2020年9月17日周四 下午3:13写道:

Re: python udf 提交到本地节点执行报错

2020-09-17 文章 myfjdthink
你好,我的本地集群是单点的,直接使用文档里的 bin/start-cluster.sh 命令启动的。 我扫了一遍文档,没找到介绍如何配置集群里的 pylink 相关的信息,可以麻烦你告诉我相关文档在哪里吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: python udf 提交到本地节点执行报错

2020-09-17 文章 Xingbo Huang
Hi, 你可以看到报错信息里面有这么一条: ImportError: No module named pyflink 看起来是你的集群环境使用的python环境没有安装pyflink Best, Xingbo myfjdthink 于2020年9月17日周四 下午2:50写道: > 操作系统 > > Mac OS > > flink --version > > Version: 1.11.1, Commit ID: 7eb514a > > > 代码 > > from pyflink.table import StreamTableEnvironment,

Re: pyflink连接器支持问题

2020-09-17 文章 Xingbo Huang
Hi, 现在flink没有提供官方的IBM MQ的connector实现,现在已经支持的connector类型,你可以参考文档[1]。如果你需要支持其他connector,你需要提供自定义的connector的java实现,然后在你的python作业里面通过api或者命令行参数的方式把connector的Jar包添加进去,具体可以参考文档[2] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/ [2]

python udf 提交到本地节点执行报错

2020-09-17 文章 myfjdthink
操作系统 Mac OS flink --version Version: 1.11.1, Commit ID: 7eb514a 代码 from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import udf # 1. create a TableEnvironment env_settings =

pyflink连接器支持问题

2020-09-17 文章 whh_960101
您好,请问pyflink现在支持的连接器有IBM MQ吗,因为需要使用到,感谢解答!

Re: Flink SQL TableSource复用问题,相同数据源聚合多个指标,引擎创建多个相同的数据源

2020-09-17 文章 Jingsong Li
你仔细看看这两个数据源是不是有什么不同 只要有一点不同,Blink 就 reuse 不了 On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai <154434...@qq.com> wrote: > 场景描述: > 通过Flink SQL创建两个Kafka数据源,对数据源去重处理,然后Union ALL合并,并创建临时视图 > 然后通过Flink SQL读取临时视图进行聚合计算指标,结果写入Redis > 问题描述: > Flink SQL 解析器会为每个聚合运算创建相同的两个数据源 > > 在下面Blink

Re: Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-17 文章 Jingsong Li
是的,可以测一下,理论上 mr writer不应该有较大性能差距。 > 为何要强制滚动文件 因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。 On Thu, Sep 17, 2020 at 2:05 PM kandy.wang wrote: > > > > ok. 就是用hadoop mr writer vs flink 自实现的native > writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer >

Re:Re: Re: Re: Re: StreamingFileWriter 压测性能

2020-09-17 文章 kandy.wang
ok. 就是用hadoop mr writer vs flink 自实现的native writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer 改成false是可以满足我们的写hive需求了 还有一个问题,之前问过你,你还没回复: HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么? 如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、