flink版本 1.10.0 没有使用checkpoint
Kafka version : 0.10.2.1
数据源为kafka
代码如下:
val topicHkList = List(HqKafkaTopic.KAFKA_TOPIC_HK_TRADE_TICKER,
HqKafkaTopic.KAFKA_TOPIC_HK_INDEX)
val kafkaHkConsumer: FlinkKafkaConsumer[Tuple3[String, String, String]] =
new FlinkKafkaConsumer(topicHkList, new
hi, Jark
开启 minibatch
是将中间数据按一批次处理,如果中间回撤数据和后续的更新数据分到两个minibatch里了,还是不能避免下游系统查询到中间结果的问题
--
Sent from: http://apache-flink.147419.n8.nabble.com/
开启 minibatch 可以基本解决中间结果的问题:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
Best,
Jark
On Fri, 18 Sep 2020 at 11:57, xushanshan <1337220...@qq.com> wrote:
> 问题内容已修改补充完成
>
>
>
> --
> Sent from:
问题内容已修改补充完成
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi: [求助] 我这里用flink-sql消费kafka数据, 通过窗口做pvuv的计算, 第一次和第二次计算的结果不一致, 不太了解为什么
0> mac本地环境
1> flink 1.11.1
2> kafka 0.10.2.2, topic为message-json, 分区为3, 副本为1
3> 使用的是sql-client.sh 环境
4> 先在sql-cli中创建了iservVisit表
create table iservVisit (
type string comment '时间类型',
uuid string comment '用户uri',
??kafka??
//kafka
DataStreamSource
考虑下面的场景:
KeyBy userId, 把该 userId 所用的相关记录存起来放在 ListState 中
private transient ListState list;
@Override
public void processElement(Tuple2 value, Context ctx,
Collector out)
throws Exception {
list.add(value.f1);
}
TTL 设为 7 天。
如果这个 userId 超过 7 天没有任何消息,那这个 userId 相应的 ListState 会被删除。
但如果这
hi godfrey,
我们的用法类似zeppelin, 项目形式类似notebook, 在第一次运行笔记时创建env,
再次运行notebook时会创建新线程来构建job运行, 所以我参考了zepplin的做法暂时fix了这个问题
godfrey he 于2020年9月17日周四 下午10:07写道:
> TableEnvironment 不是多线程安全的。
>
> btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗?
>
> Jeff Zhang 于2020年9月14日周一 下午12:10写道:
>
> >
能贴下你的 DDL 和 query 吗?
你可以试试用反引号, select `F1`, `F2` from xxx;
Best,
Jark
On Thu, 17 Sep 2020 at 23:28, godfrey he wrote:
> 据我所知,目前flink是大小写不敏感,但是pg是大小写敏感。这问题暂时没法解
>
> wdmcode 于2020年9月10日周四 上午9:44写道:
>
> > Hi Jimmy
> >
> > 给字段加双引号试试呢
> > Select “F1”,”F2” from xxx.xxx;
> >
> >
> > 发件人: Jimmy
@Jingsong Li
测了一下,1.11.2还是和以前一样呢。 还是table.exec.hive.fallback-mapred-writer=false效果明显。
我们flink 环境是基于 flink 1.11 分支源码自己 打的jar 来测的。你们那边针对 StreamingFileWriter
修改应该都提交到flink 1.11分支了吧。
顺便问一下,你们1.12版本,针对小文件合并,会有改进么 ?
在 2020-09-17 14:19:42,"Jingsong Li" 写道:
>是的,可以测一下,理论上 mr writer不应该有较大性能差距。
>
据我所知,目前flink是大小写不敏感,但是pg是大小写敏感。这问题暂时没法解
wdmcode 于2020年9月10日周四 上午9:44写道:
> Hi Jimmy
>
> 给字段加双引号试试呢
> Select “F1”,”F2” from xxx.xxx;
>
>
> 发件人: Jimmy Zhang
> 发送时间: Thursday, September 10, 2020 9:41 AM
> 收件人: user-zh@flink.apache.org
> 主题: Flink 1.11 jdbc查pg失败
>
> flink 1.11用jdbc查询pg表时,pg表的字段是大写
能提供完整的demo吗?
me 于2020年9月11日周五 下午6:54写道:
> 1.flink 版本是1.11.1
> streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamBlinkSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> streamTableEnv = StreamTableEnvironment.create(streamEnv,
>
TableEnvironment 不是多线程安全的。
btw, 你能描述一下你在多线程情况下怎么使用 TableEnvironment 的吗?
Jeff Zhang 于2020年9月14日周一 下午12:10写道:
> 参考zeppelin的做法,每个线程里都调用这个
>
>
> https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java#L111
>
>
> jun su
cc @Rui Li
李佳宸 于2020年9月14日周一 下午5:11写道:
> 大家好~我执行batch table的作业写入hive时,会出现FileNotFoundException的错误。找不到.staging文件
> 版本是1.11.1
> Caused by: java.io.FileNotFoundException: File
>
> hdfs://gykjcluster/user/hive/warehouse/etl_et_flink_sink.db/ods_et_es_financialestimate/.staging_1600070419144
> does
sql
client的默认并发为1,如果没有在sql-client-defaults.yaml显示设置parallelism,代码里面的默认并发为1.因此需要显示的设置
sql-client-defaults.yaml的parallelism
Jark Wu 于2020年9月15日周二 上午11:43写道:
> Hi,
>
> 请问
> 1. 有完整的异常栈吗? 你是怎么从 ck 恢复的呢? 用的什么命令?
> 2. 是的。因为 source 只能并发1。先写到 kafka,再从 kafka 同步是可以的。
>
> Best,
> Jark
>
> On Fri, 11 Sep
已知问题,已fix:https://issues.apache.org/jira/browse/FLINK-18750
guaishushu1...@163.com 于2020年9月16日周三 下午2:32写道:
> 当create_view和LATERAL TABLE 共用时 会出现字段找不到异常
>
> 语法:
> CREATE TABLE billing_data_test (
> message STRING
>
>
> create view v1 as
> select T.*
> from billing_data_test,
> LATERAL
blink 根据每个算子的digest信息来判断是否可以reuse(只有digest完全一样才可以reuse),
例如table source节点,算子信息包括:表名,select的字段信息,其他push down之后的信息等。
你可以通过explain的方式把plan打印出来看看,source的digest是否一样
Jingsong Li 于2020年9月17日周四 下午2:45写道:
> 你仔细看看这两个数据源是不是有什么不同
> 只要有一点不同,Blink 就 reuse 不了
>
> On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai
可以用flink提供的“去重"语法来支持
[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html#%E5%8E%BB%E9%87%8D
Shengkai Fang 于2020年9月15日周二 下午4:02写道:
> hi, 我对于使用upsert
>
> kafka能够省state感到疑惑。金竹老师提供的实现只是丢掉了delete消息,你的下游表依旧需要手动去重才可以得到准确的结果才对啊。如果每个下游表都手动去重这样子还能省state吗?
>
> star
> sink.buffer-flush.max-rows = '0' 导致每接收一条数据就插入数据库
这个应该是个 bug,我建了个 issue:https://issues.apache.org/jira/browse/FLINK-19280
Best,
Jark
On Thu, 17 Sep 2020 at 18:15, chenxuying wrote:
> 环境是flink1.11.2+idea
> sql:
> CREATE TABLE sourceTable (
> platform STRING
> ,game_id bigint
> ) WITH
从你发的报错栈来看TM是用的ip地址去连的,正常如果是非HA的话,应该是通过service来连接的
因为JM在非HA情况下rpc地址是bind到service上的
你是否有对Flink的代码做修改呢,或者用native模式起来以后,修改过ConfigMap等
Best,
Yang
yanzhibo 于2020年9月17日周四 下午3:55写道:
> 是非ha,所有tm都注册不上来,但是在tm的pod中 根据service 是可以ping 通 jobmanager的
>
>
> > 2020年9月17日 上午11:10,Yang Wang 写道:
> >
> >
环境是flink1.11.2+idea
sql:
CREATE TABLE sourceTable (
platform STRING
,game_id bigint
) WITH (
...
);
CREATE TABLE sinktable (
platform STRING
,game_id bigint
) WITH (
'connector' = 'jdbc',
'url' = '',
'table-name' = '',
'driver' = 'com.mysql.jdbc.Driver',
谢谢
Yang Wang 于2020年9月17日周四 上午11:20写道:
> Flink官方其实是没有开发K8s Operator的,目前使用比较多的有lyft[1]和google[2]开发的两个
> 都已经在生产环境使用了,支持的都是Standalone job/application on K8s,还不支持native[3]的集成
>
> 如果你想自己实现一个K8s Operator支持native模式的话,我之前做过一个POC,你可以参考一下[4]
>
>
> [1]. https://github.com/lyft/flinkk8soperator
> [2].
大佬好,现在有个疑问,因为要用到自定义的state
保存某个值,这个状态是在keydprocessFunction实现类中用到的,我在sink的时候,实现了CheckpointFunction接口,实现了snapshotState方法,在这个方法中,我只写了一个flush到kudu的方法。这样的话我之前自己定义的state能保存到状态后端吗。
换句话说,就是snapshotState这个方法是在做checkpoint的同时调用了这个方法中的动作,还是说其他的状态不做了,只做我实现的snapshotState这个方法呢?
官方的暂不支持。 需要自己开发JdbcDialect插件。
On Thu, 17 Sep 2020 at 13:59, xuzh wrote:
> 请问flink jdbc connector 支持greenplum吗,还是要另外自己写插件
Flink 1.11.1
CDH 5.15.2
提交命令:/opt/flink-1.11.1/bin/flink run -m yarn-cluster -p 2 -yjm 1024m -ytm
2048m -ynm job_sync -c com.qcc.hive.TidbBinlogSyncHive
/tmp/flink-binlog-sync-hive-1.0-SNAPSHOT.jar
flink-conf.yaml 重启策策略
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 5
是非ha,所有tm都注册不上来,但是在tm的pod中 根据service 是可以ping 通 jobmanager的
> 2020年9月17日 上午11:10,Yang Wang 写道:
>
> 你这个报错看着是TM向JM注册超时了,使用的HA还是非HA部署呢
>
> 如果是HA的话,TM是直接使用JM的Pod ip进行通信的,这个时候需要登录pod确认一下网络是否是通的
> 如果是非HA的话,TM是使用service来向JM注册,你需要检查一下K8s的kube proxy是否正常
>
> 另外,是所有TM都注册不上来,还是只有个别的。这个也可以排除网络问题
>
>
>
感谢搞定了,根据你提供的文档,我把命令改为
flink run -py src/etl/hello_world.py -pyexec
/usr/local/opt/python@3.7/bin/python3
指定了 python 执行器就行了
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,
你可以参考文档[1],里面的api
set_python_executable(python_exec)用来设置你的Python环境的,然后你需要确保你这个python环境有安装pyflink。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/dependency_management.html#python-dependency
Best,
Xingbo
myfjdthink 于2020年9月17日周四 下午3:13写道:
你好,我的本地集群是单点的,直接使用文档里的
bin/start-cluster.sh
命令启动的。
我扫了一遍文档,没找到介绍如何配置集群里的 pylink 相关的信息,可以麻烦你告诉我相关文档在哪里吗
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,
你可以看到报错信息里面有这么一条:
ImportError: No module named pyflink
看起来是你的集群环境使用的python环境没有安装pyflink
Best,
Xingbo
myfjdthink 于2020年9月17日周四 下午2:50写道:
> 操作系统
>
> Mac OS
>
> flink --version
>
> Version: 1.11.1, Commit ID: 7eb514a
>
>
> 代码
>
> from pyflink.table import StreamTableEnvironment,
Hi,
现在flink没有提供官方的IBM
MQ的connector实现,现在已经支持的connector类型,你可以参考文档[1]。如果你需要支持其他connector,你需要提供自定义的connector的java实现,然后在你的python作业里面通过api或者命令行参数的方式把connector的Jar包添加进去,具体可以参考文档[2]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
[2]
操作系统
Mac OS
flink --version
Version: 1.11.1, Commit ID: 7eb514a
代码
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import udf
# 1. create a TableEnvironment
env_settings =
您好,请问pyflink现在支持的连接器有IBM MQ吗,因为需要使用到,感谢解答!
你仔细看看这两个数据源是不是有什么不同
只要有一点不同,Blink 就 reuse 不了
On Thu, Sep 17, 2020 at 11:10 AM Kevin Dai <154434...@qq.com> wrote:
> 场景描述:
> 通过Flink SQL创建两个Kafka数据源,对数据源去重处理,然后Union ALL合并,并创建临时视图
> 然后通过Flink SQL读取临时视图进行聚合计算指标,结果写入Redis
> 问题描述:
> Flink SQL 解析器会为每个聚合运算创建相同的两个数据源
>
> 在下面Blink
是的,可以测一下,理论上 mr writer不应该有较大性能差距。
> 为何要强制滚动文件
因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。
On Thu, Sep 17, 2020 at 2:05 PM kandy.wang wrote:
>
>
>
> ok. 就是用hadoop mr writer vs flink 自实现的native
> writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
>
ok. 就是用hadoop mr writer vs flink 自实现的native
writer之间的性能对比了。至少目前看了一下table.exec.hive.fallback-mapred-writer
改成false是可以满足我们的写hive需求了
还有一个问题,之前问过你,你还没回复:
HiveRollingPolicy为什么 shouldRollOnCheckpoint true 为何要强制滚动文件,这个可以抽取成一个配置参数么?
如果强制滚动的话,基本上sink.rolling-policy.rollover-interval、
36 matches
Mail list logo