为支持作业上线评审和遇到问题进行版本回滚,实时计算平台中的flink作业版本管理采用哪种方案比较好?
一种声音是用数据库来存,另一种声音是使用git来管理:一个作业对应一个repo,里面有作业sql或jar文件,作业参数和配置文件等。
用数据库存的话作业文件比较割裂,像文本文件可以直接存表,但像jar包的话得存分布式文件系统,同时在数据表中记录文件id。我个人更倾向于用git,不知道这里会不会有什么坑?还请做过的朋友给个建议,谢谢!
为支持作业上线评审和遇到问题进行版本回滚,实时计算平台中的flink作业版本管理采用哪种方案比较好?
一种声音是用数据库来存,另一种声音是使用git来管理:一个作业对应一个repo,里面有作业sql或jar文件,作业参数和配置文件等。
使用了如下命令来提交flink作业到yarn上运行,结果出错。如果job jar路径改成本地的就没有问题。我已经将
flink-oss-fs-hadoop-1.12.0.jar 放到flink
lib目录下面,并且在flink.conf配置文件中设置好了oss参数。试问,这种作业jar在远端的分布式文件系统flink难道不支持吗?
./bin/flink run-application -t yarn-application \
-Dyarn.provided.lib.dirs="oss://odps-prd/rtdp/flinkLib" \
./bin/flink run-application -t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://localhost:9000/flinkLib" \
hdfs://localhost:9000/flinkJobs/TopSpeedWindowing.jar
这种命令执行方式是可以执行的。
在 2021-01-20 10:21:32,"casel.chen" 写道:
>使用了如下命令来提交flink作业到yarn上运行,结果出
今天尝试使用yarn
application模式(带yarn.provided.lib.dirs参数),将$FLINK_LIB目录下的jar包上传到了hdfs,结果报了如下的错,是少了哪个jar包或配置文件吗?
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy
Yarn Application Cluster
at
flink sql cdc发到kafka,显示下游写kafka并行度只有1,有没有办法提高并行度呢?
显示job-parallelism, table.exec.resource.default-parallelism, parallelism.default
都是 24,但execution graph显示parallelism还是1,我设置了pipeline.operator-chaining=false
这个问题很严重啊,生产线上可不敢这么用,丢失部分数据是不能接受的。社区什么时候能支持 GTID 呢?官方网档上有写么?
在 2021-06-07 18:40:50,"董建" <62...@163.com> 写道:
>
>
>
>我也遇到了这种情况,可能是你们的db做了主从切换。
>因为binlog每台服务器的pos都不一样。
>mysql5.6以后支持了GTID的同步方式,这个是全局唯一的。但是目前mysql-cdc貌似还不支持。
>我目前的解决方案是出错后从最后的位置开始消费,可能会丢失一部分数据。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
双流interval join是否可行呢?
在 2021-06-07 16:35:10,"Jason Lee" 写道:
>
>
>我么生产环境同样遇到这种问题,因为上有流数据到了,但是维表数据未更新导致丢失部分数据,请问大家现在有好的解决方案去解决Flink SQL
>维表延迟Join的问题了吗?
>
>
>有解决方案的小伙伴能分享下嘛?
>| |
>JasonLee
>|
>|
>jasonlee1...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2021年02月25日 14:40,Suhan 写道:
如何获取flink sql的血缘关系?如:表A -> 表B。有代码示例吗?谢谢!
flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc
connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink
task呢?SQL正确的写法是什么?
延迟join主要是为了解决维表数据后于事实表数据到达问题。java代码可以实现,那flink sql这块能否通过sql hint解决呢?有没有示例?
需求背景:
因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink
SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink SQL作业采用的是1.13开发的。
而让平台支持不同Flink版本,我能想到有三种实现方案:
1. 平台直接调用 flink run 或 flink run-application 提交作业
优点:实现简单,每个flink版本都会带这个shell脚本
我们知道flink on 原生kubernetes当前是用k8s deployment运行一个流作业的,请问会用k8s job运行一个批作业吗?
flink sql
cdc写入kafka,期望kafka消息带上数据库database,表table,变更时间和变更标记+I/-U/+U/-D这几个特殊字段,目前来看是做不到的,对吗?
好的,我先尝试使用一下,谢谢!
在 2021-06-13 10:43:12,"Jeff Zhang" 写道:
>如果不是native k8s的话,现在已经支持了,用remote模式就可以,
>https://www.yuque.com/jeffzhangjianfeng/gldg8w/engh3w
>native k8s的话,社区正在做,这是PR: https://github.com/apache/zeppelin/pull/4116
>
>
>casel.chen 于2021年6月
。
>
>
>在 2021-06-11 18:57:36,"casel.chen" 写道:
>>我的场景就是简单的数据同步,没有join也没有group by,从一个mysql库同步到另一个mysql库。
>>上游写入数据速度很快,如果用flink sql同步的话默认只有一个并行度写,速度会跟不上,这种情况要怎么处理?
>>用的是flink 1.12.1 其jdbc connector还不支持sink.parallism参数
>>
>>
>>
>>
>>
>>
请问 flink sql cdc 场景下如何增大下游sink端并行度?
我试了修改default.parallism=2参数,并且将operator chain参数设置成false,并没有效果。
而后,我将作业分成两步:首先 源mysql cdc sink到 upsert kafka,再从 upsert kafka sink到
目标mysql。是想通过kafka partition增大sink并行度
初步测试效果是可以的,kafka建了3个partitions,每个partitions都按主键hash分配到数据,下游并行度跟partitions个数对齐。
以下是作业内容:
--
有版本。钉钉群:32803524
>
>casel.chen 于2021年6月12日周六 下午5:56写道:
>
>> 需求背景:
>> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink
>> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink
>> SQL作业采用的是1.13开发的。
>>
>>
>> 而让平台支持不同Flink版本,我能想
Is there any live code example about flink state evolution with avro? Thanks!
针对现在flink sql cdc下游并行度无法修改问题,是否可以分两步实现?谢谢!
1. flink sql cdc发到下游kafka,通过 upsert kafka connector,以debezium或canal格式,kafka
topic开多个分区
2. 再从kafka消费,通过flink sql同步到最终mysql库
在 2021-06-08 19:49:40,"Leonard Xu" 写道:
>试着回答下这两个问题。
>
>> flink 1.12的jdbc connector不支持 sink.parallism
我在spring boot应用中使用HiveCatalog展示库和表信息,通过传入hive参数初始化HiveCatalog时抛如下错误
hiveCatalog = new HiveCatalog(hiveConfig.getCatalogName(),
hiveConfig.getDefaultDatabase(), hiveConfig.getHiveConfDir());
hiveCatalog.open();
在spring boot应用所在机器上我只在/opt/hive/conf目录下准备了hive-site.xml,还缺什么配置么?
2021-06-11
hadoop 2.9.2, flink 1.12.2, hudi 0.9.0-SNAPSHOT
core-site.xml里面配置了oss相关信息,本地启flink cluster,执行flink sql client创建表,写数据和查询都没问题。
改成在项目中flink sql作业,打包成fat jar以local方式运行,项目中引用了 flink-oss-fs-hadoop,但程序报了如下错误
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem
for scheme "oss"
获取connector是否支持source和sink只要看 XXXDynamicTableFactory 是否实现
DynamicTableSourceFactory和DynamicTableSinkFactory接口,但在source情况下如何进一步判断它是否支持lookup呢?
public DynamicTableSource createDynamicTableSource(Context context)
flink sql cdc如何获取元数据?像数据库名,表名,操作类型,binlog产生时间等。
create table xxx_tbl (
k_op varchar, -- 操作类型
k_database varchar, -- 数据库名
k_table varchar, -- 表名
k_ts. BIGINT, -- binlog产生时间
idBIGINT,
name. varchar
) with (
'connector' = 'mysql-cdc',
.
'meta.fields-prefix' = 'k_'
官网文档上显示flink 1.13支持window topN,但flink 1.12没看到相关文档,请问有什么work around方法吗?
请不要截图哦
在 2021-06-23 09:47:46,"杨光跃" 写道:
1.12也支持的
| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
在2021年6月23日 09:45,casel.chen 写道:
官网文档上显示flink 1.13支持window topN,但flink 1.12没看到相关文档,请问有什么work around方法吗?
1.12 Documentation: Queries
>| |
>杨光跃
>|
>|
>yangguangyuem...@163.com
>|
>签名由网易邮箱大师定制
>在2021年6月23日 10:09,casel.chen 写道:
>请不要截图哦
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-06-23 09:47:46,"杨光跃" 写道:
>
>1.12也支持的
上游有多个流需要经过flink作近实时关联,数据流有先到和后到问题,为了尽可能关联上,有什么办法可以实现延迟关联吗?每个流开个1分钟窗口再关联相同key的元素可以吗?用flink
sql如何实现?谢谢!
wStart, FROM source_event
> group by TUMBLE(ts, INTERVAL '10' SECOND), 主键
>
>
>2. 根据上一步的结果取top5
>select * from (select * ,ROW_NUMBER() OVER (PARTITION BY wStart ORDER BY
>处理时间字段 ) as rownum from 上一步的虚拟表) where rownum <= 5
>
>| |
>杨光跃
>|
>|
>yangguangyuem...@163.
达一定时间再发出来
>datastream可以用窗口+side output来实现,但是sql没有side output实现,所以存在丢数据的情况
>
>> 2021年6月23日 上午8:27,casel.chen 写道:
>>
>> 上游有多个流需要经过flink作近实时关联,数据流有先到和后到问题,为了尽可能关联上,有什么办法可以实现延迟关联吗?每个流开个1分钟窗口再关联相同key的元素可以吗?用flink
>> sql如何实现?谢谢!
>
Flink CDC什么时候能够支持修改并行度,进行细粒度的资源控制?目前我也遇到flink sql
cdc写mysql遇到数据同步跟不上数据写入速度问题,何时能支持像mysql并行复制这种机制呢?
在 2021-06-16 17:27:14,"Leonard Xu" 写道:
>看起来和 Flink-CDC 关系不大,看异常栈是 ES 侧抛出的异常 version_conflict_engine_exception,
>可以查下这个异常,看下是不是有写(其他作业/业务 也在写同步表)冲突。
>
>祝好,
>Leonard
>
>> 在
引用 Leonard Xu大佬之前的回答:
> flink 1.13的jdbc connector新增 sink.parallism
> 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证
sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
否则可能导致数据乱序。 这个社区也在从 plan
hive-stie.xml不在classpath下面,而是通过配置文件加载的: hiveConfig.getHiveConfDir() 例如
/opt/hive/conf 这个目录下有hive-site.xml
在 2021-06-11 13:59:20,"Rui Li" 写道:
>你好,
>
>看一下jar里面是不是有hive-site.xml文件呢?
>
>On Fri, Jun 11, 2021 at 10:37 AM casel.chen wrote:
>
>> 我在spri
有例子吗?或者相关资料连接也行
在 2021-06-11 12:40:10,"chenchencc" <1353637...@qq.com> 写道:
>使用事件时间就可以延时
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
等于业务down掉。
>2、如果只是简单的insert into xxx select
>xxx,就不用担心,runtime在遇到上下游并行度不一致时,如果有主键会按照主键hash的。
>
>
>在 2021-06-08 14:05:17,"casel.chen" 写道:
>>flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc
>>connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同
你的场景是怎样的呢?
>
>
>
>
>另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es
>connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc
>connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash
>shuffle,确保相同pk的记录发到同一个sink task。
>
>
>在 2021-
'username' = 'mysqluser', 'password' = 'mysqluser',
>'table-name' = 'jdbc_sink')
>在 2021-05-18 11:55:46,"casel.chen" 写道:
>>我的flink sql作业如下
>>
>>
>>SELECT
>>product_name,
>>window_start,
>>window_end,
>>CAST(SUM(trans_amt)ASDECIMAL(24,2)
请问在native kubernetes上如何运行Flink History Server? 有没有相应的文档?
我用native kubernetes方式部署flink session
cluster,想修改某个包下的日志级别,如果直接修改configmap下的log4j-console.properties再重新部署是能生效的,但是通过命令行
(./bin/kubernetes-session.sh -Dkubernetes.cluster-id=xxx) 起flink session
cluster会将之前的修改冲掉,有什么办法可以保留下之前的修改吗?是否有命令行启动参数可以指定自定义的logging配置?谢谢!
没有人知道吗?
在 2021-05-13 17:20:15,"casel.chen" 写道:
flink sql怎样将change log stream转换成append log stream?
通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl +
group by timestamp这种方式聚合。
问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!
没有人知道吗?
在 2021-05-13 08:19:24,"casel.chen" 写道:
>flink sql如何修改执行计划?例如,修改上下游算子不同的并行度,或者将算子链人为打断等等。
>我知道如何获取flink sql执行计划,但要怎么人为干预这个执行计划呢?还请大佬解答一下,谢谢!
采用flink sql定义源表时,哪些connector支持提供部分字段,哪些connector必须要提供全量字段呢?
这边常用的connector主要有kafka, jdbc, clickhouse和mysql-cdc。
cdc是不是必须要提供对应表的全量字段呢?如果上游mysql业务表新增了字段,flink sql作业会不会出错?
kafka表定义是否支持部分字段?
我的flink sql作业如下
SELECT
product_name,
window_start,
window_end,
CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
CAST(COUNT(order_no)ASBIGINT) trans_cnt,
-- LOCALTIMESTAMP AS insert_time,
'微支付事业部'AS bus_name
FROM(
mysql sink表的定义如下
CREATE TABLE XXX (
) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT
flink sql如何修改执行计划?例如,修改上下游算子不同的并行度,或者将算子链人为打断等等。
我知道如何获取flink sql执行计划,但要怎么人为干预这个执行计划呢?还请大佬解答一下,谢谢!
flink sql怎样将change log stream转换成append log stream?
通过cdc接入了change log stream,后面就不能使用窗口聚合了,像Tumble和Hop, Session窗口。只能通过state ttl +
group by timestamp这种方式聚合。
问一下有没有办法将change log stream转换成append log stream,从而可以使用上述窗口聚合了呢?谢谢!
flink
sql运行在阿里云k8s用oss作为checkpoint存储介质,在作业启动过程中出错,请问这个NoSuchKey是指什么?flink在获取checkpoint作restore吗?
2021-05-21 10:56:10,278 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source:
TableSourceScan(table=[[default_catalog, default_database, kafka_source]],
fields=[id,
如下
CREATE TEMPORARY FUNCTION get_seniority(tenure INT64) AS (
CASE WHEN tenure < 1 THEN "analyst"
WHEN tenure BETWEEN 1 and 3 THEN "associate"
WHEN tenure BETWEEN 3 and 5 THEN "senior"
WHEN tenure > 5 THEN "vp"
ELSE "n/a"
END
);
SELECT name
,
flink sql支持Common Table Expression (CTE)吗?是不是可以通过 create temporary view xxx
来实现?CTE和temporary view的区别是什么?
例如
with toronto_ppl as (
SELECT DISTINCT name
FROM population
WHERE country = "Canada"
AND city = "Toronto"
)
, avg_female_salary as (
SELECT AVG(salary) as
flink sql作业:消费mysql binlog将数据同步到 mongodb
问题:
1. mysql-cdc connector只能设置成一个并行度吗?
2. 可以增大mongodb的sink并行度吗?可以的话,要如何设置?它保证主键相同的记录会发到同一个分区sink吗?
jdbc:mysql://host:3306/datav_test?useUnicode=true=utf8
本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢!
在 2021-05-19 17:52:01,"Michael Ran" 写道:
>
>
>
>数据库的字段字符编码
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-18 18:19:31,
LE’F%')])
DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2],
fields=[count, word])
== Physical Execution Plan == Stage 1 : Data Source content : collect elements
with CollectionInputFormat
在 2021-05-25 10:40:46,"casel.chen" 写道:
>数据库字符编码设置如下
>
>
我有一个如下flink sql cdc作业,设置了'scan.startup.mode' =
'latest-offset'。但在作业重启后发现它又从头开始消费binlog,导致sink下游数据库频繁报duplicate key
error,有什么办法可以避免吗?
CREATE TABLE `mysql_source` (
`id` STRING,
`acct_id` STRING,
`acct_name` STRING,
`acct_type` STRING,
`acct_bal` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
有一个flink sql
mysql-cdc作业,从kafka消费canal-json格式的binlog数据,打到下游存储,遇到一个问题:上游源表字段是会动态添加的,而我的sql
table定义是静态的,如果上游schema发生变化,我的作业就会失败。在flink
sql中是否可以针对目标数据源只定义用到的字段?如果可以的话应该怎么实现?现在是会抛类似下面的error。意思是原本包含43列的数据表,在我的DDL中只定义了其中的4列。有哪些格式是支持定义部分字段的呢?
21/06/02 18:54:22 [Source:
如何根据flink sql解析出作业的血缘关系?找到类似这样的血缘关系:source table A --> lookup table B --> sink
table C
作业中使用了增量式的rocksdb状态后端,请问如果再设置了最多保留checkpoints个数的话,会不会造成rocksdb
state恢复失败?例如,假设完整的state恢复需要最近10个chk,但因为设置了最多保留checkpoints个数为5的话,状态是不是恢复不了了?
我的作业是用flink sql消费mysql cdc
binlog并实时同步到mongodb。如果只开一个并行度的话,mongodb的写入速度可能追不上mysql的写入。所以我需要在sink端开大并行度。
我不清楚用sql怎么写keyBy,是不是要group by
pk呢?我原来的想法是在MongoDBSinkFunction中开一个线程池,每个线程对应下游sink的一个并行度,每个线程带一个queue,MongoDBSinkFunction根据数据PK往对应的queue发数据,每个消费者线程从自己的queue
pull数据再进行批量插入。不知道这样可不可行?
flink postgres jdbc catalog是只读的吗?能写的catalog 除了Hive Catalog还有哪些?社区什么时候会有Mysql
JDBC Catalog呢?
flink sql调整算子并行度的方法有哪些?通过 sql hint 可以调整吗?
上游是binlog cdc消费获取的回撤流,现要使用flink sql统计分析该回撤流上每5分钟的sum值,不能使用常规tumble
window是吗?只能使用group by ts配合state TTL进行?
另外,问一下flink sql的state TTL只能是全局设置吗?能够通过在sql hint上添加从而可以细粒度控制吗?
flink sql cdc作数据同步,因为是基于库+表级别的,表数量太多导致作业数太多。请问能否用flink sql
cdc基于库级别同步?这样作业数量会少很多。
我有一个flink sql写的数据实时同步作业,从mysql binlog cdc消费发到mongodb,仅此而已,没有lookup,也没有join。
查看checkpoint页显示状态有17MB,checkpoint耗时要2s。
想知道为什么状态会如此之大,有没有状态查看工具看看里面到底存了什么信息?
使用场景:我们使用canal将mysql binlog输出到kafka,然后想通过flink消费kafka数据过滤掉 delete
操作的数据插入到文件系统,因为要做历史数据存档用。
查了下官网
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/#available-metadata
{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel
使用flink 1.12.1,作业依赖了flink-sql-connector-kafka (依赖kafka-client 2.4.1)
和一个消费阿里dts应用的connector(该connector依赖kafka-client
1.0.0),出现下面的异常后我将flink-sql-connector-kafka依赖的kafka-client也换成了1.0.0,结果运行作业还是报同样的异常。请问这是jar包冲突造成的吗?应该怎么解决?
Caused by: java.lang.LinkageError: loader constraint violation: loader
我在本地使用local运行模式运行flink sql,将数据从kafka写到mongodb,mongodb
connector是自己开发的,实现了CheckpointedFunction接口,debug的时候发现数据进来的时候有调用invoke方法,但没有调用initialState和snapshotState方法,我有设置enableCheckpoint,同样的程序使用kubernetes部署发现是会调用snapshotState方法。我的问题是:local运行模式下不会生成checkpoint吗?
flink运行在原生k8s上,现在想要修改Root Logger Level和动态添加 Logger Name -> Logger
Level,以及用户可以传入自定义的日志模板,目前有办法做到么?
我们使用k8s运行flink作业,作业日志存储在容器中,一旦作业挂了容器销毁了就没法获取出问题的日志,有什么办法可以将日志保存到oss上么?通过配置启动history
server吗?
flink运行批作业相较于流作业需要修改/添加哪些配置呢?
另外,使用方面有什么特别需要注意的地方吗?谢谢!
ent/resource-providers/native_kubernetes/#changing-the-log-level-dynamically
>
>Best,
>Yang
>
>casel.chen 于2021年7月9日周五 下午8:29写道:
>
>> flink运行在原生k8s上,现在想要修改Root Logger Level和动态添加 Logger Name -> Logger
>> Level,以及用户可以传入自定义的日志模板,目前有办法做到么?
场景:mysql数据实时同步到mongodb. 上游mysql binlog日志发到一个kafka topic,
不保证同一个主键的记录发到相同的partition,为了保证下游sink
mongodb同一主键的所有记录按序保存,所以需要按主键keyby。然后下游再批量写入mongodb。
问题:flink sql有办法解决上述问题?如果可以的话,要怎么写?
create table person_source (
id BIGINT PRIMARY KEY NOT FORCED,
name STRING,
age BIGINT
) with (
flink
run是否支持读取远程文件系统,例如oss://或hdfs://路径下的jar文件?看源码是需要构建PakcagedProgram,而它的构造函数中有一个File
jarFile参数。不知是否能够从oss路径或hdfs路径构建出File对象。
我试着答k8s上部署flink
standalone集群,做HA之前集群是能够正常work的,在做HA的时候发现在configmap中添加了如下两个HA配置后JM就会抛异常,这是为什么?
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: oss:///odps-prd/rtdp/flink/recovery
2021-02-09 00:03:04,421 ERROR
最近试用flink kubernetes
application时发现TM不断申请再终止,而且设置的LoadBalancer类型的Rest服务一直没有ready,查看不到flink web
ui,k8s日志如下,这是什么原因?是因为我申请的资源太小么?
= 启动参数
"kubernetes.jobmanager.cpu": "0.1",
"kubernetes.taskmanager.cpu": "0.1",
"taskmanager.numberOfTaskSlots": "1",
"jobmanager.memory.process.size":
Flink on K8S Standalone模式下可以通过yaml启多个JM,但是在Native K8S模式下要如果做呢?有文档资料介绍吗?谢谢!
请问有flink + hudi或iceberg + aliyun oss的示例吗?谢谢!
请教一下flink sql多条数据sink用 statement set 语句时,
1. 如果其中一条sink条发生背压或故障,会影响其他sink流吗?
2. 在flink sql cdc 消费同一张mysql表sink进多种数据源场景下,例如 mysql -> fink cdc -> mongodb &
polardb 建议是启多个作业分别etl,还是分两段 mysql -> flink cdc -> kafka -> flink -> mongodb &
polardb ... 呢?关系数据库端接入同时多个cdc会不会影响性能?
flink cdc对接上游的mysql或pg业务库时遇到业务库大批量修数或schema变更是怎么处理的?
会不会瞬间产生很多changelog records打爆flink应用?如果会的话应该要如何避免呢?谢谢!
flink sql中如何使用异步io关联维表?官网文档有介绍么?
我在阿里云k8s上部署flink on native kubernetes application,默认用的服务暴露类型是
LoadBalancer,启动后会在公网暴露rest
url。运维管理人员不允许这样,说是只能使用固定预先申请的几个SLB,但我在flink官网没有找到有参数设置LoadBalancerIP,这样情况要怎么实现?
使用 flink sql 1.12.1时遇到三个问题:
1. GroupWindowAggregate doesn't support consuming update and delete changes
which is produced by node TableSourceScan(table=[[default_catalog,
default_database, mcsp_pay_log, ...
按时间窗口聚合不支持上游是canal-json格式的cdc表的情况么?我的业务表其实是一张日志表,怎样用flink sql将retract
table转成append
flink window doesn't support retract stream 的话有什么workaround办法吗?常见的场景有 业务表cdc ->
kakfa -> flink按时间窗口聚合
如果业务表是只会insert的日志表,该如何将retract table转换成普通table?
GroupWindowAggregate doesn't support consuming update and delete changes which
is produced by node TableSourceScan(table=[[default_catalog,
我也遇到同样的问题
GroupWindowAggregate doesn't support consuming update and delete changes which
is produced by node TableSourceScan(table=[[default_catalog, default_database,
mcsp_pay_log, ...
按时间窗口聚合不支持上游是canal-json格式的cdc表的情况么?我的业务表其实是一张日志表,怎样用flink sql将retract
table转成append table?
flink run是否支持读取oss://或hdfs://路径下的jar文件?看源码是需要构建PakcagedProgram,而它的构造函数中有一个File
jarFile参数。不知是否能够从oss路径或hdfs路径构建出File对象。
private PackagedProgram(
@Nullable File jarFile,
List classpaths,
@Nullable String entryPointClassName,
Configuration configuration,
SavepointRestoreSettings
目标是用flink作业实现类似canal server的功能
CREATE TABLE `binlog_table` (
`id` INT,
`name` STRING,
`sys_id` STRING,
`sequence` INT,
`filter`
>发送时间:2021年4月21日(星期三) 下午2:16
>收件人:"user-zh"
>主题:Re: flink sql 不支持 mysql cdc 以 canal json 格式发到kafka吗?
>
>
>
>Hi casel.
>flink-cdc-connectors 是集成的 Debezium 引擎,应该不支持 Canal 格式。
>
>https://github.com/ververica/flink-cdc-connectors/blob/master/README.md
>
>ca
如题,在k8s环境下不想使用hdfs作为high-availability.storageDir,有没有办法直接使用oss呢?checkpoint和savepoint已经能够使用oss了。
在k8s上部署sesson模式的flink集群遇到jobmanager报如下错误,请问这是什么原因造成的?要如何fix?
2021-02-07 08:21:41,873 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Starting RPC endpoint for
org.apache.flink.runtime.dispatcher.StandaloneDispatcher at
akka://flink/user/rpc/dispatcher_1 .
2021-02-07
目前是UTC时区的,怎样才能设置成当地的东8区呢?谢谢!
2021-02-19 01:34:21,259 INFO akka.event.slf4j.Slf4jLogger
[] - Slf4jLogger started
2021-02-19 01:34:22,155 INFO akka.remote.Remoting
[] - Starting remoting
2021-02-19 01:34:21,259 INFO
我的疑问正是flink cdc集成debezium后为何会把原始信息弄丢失了?直接采用原生的debezium或者canal同步数据固然可以。但如果flink
cdc直接能发出来的话不就可以节省这些组件和运维么?flink cdc设计的初衷也是如此。
在 2021-04-22 11:01:22,"飞翔" 写道:
既然这样,为何要用flink去同步信息,把信息的原始信息都丢失了。你可以直接采用原生的debezium或者canal同步数据,发送kafka,
比如canal的样例,虽然after
最近有个需求是用flink对接mysql binlog获取变更记录发到下游kafka(类似于canal
server或debezium功能),下游kafka消息需要有 before, after, op_type, ts, database, table
这些字段信息。我试了如下脚本发现出来的kafka消息只有data和op_type信息,其他信息都获取不到。追踪到上游debezium(flink
cdc是基于debezium实现的)发出来的record本身就只带data和op_type信息,问一下有没有别的办法获取到变更原始记录呢?
CREATE TABLE
flink 1.12.1版本,作业通过flink run命令提交,运行在native
k8s上,有个问题:作业日志要如何持久化下来?因为发现作业重启后,pod销毁,落在pod本地的日志也看不到了,不知道出错的root
cause。有没有办法将作业日志持久化下来?比如存到ELK或阿里云oss上面。另外,我们使用的是阿里云
EKS,是否可以配置作业使用阿里云日志服务呢?我知道k8s应用可以配置ecs宿主机路径映射存储,但这需要修改创建作业的yaml文件,不过我没有看到flink
on k8s启动命令有这个选项,求解答,谢谢!
pip list | grep apache
apache-beam2.27.0
apache-flink 1.13.2
apache-flink-libraries 1.13.2
$ python
Python 3.7.5 (v3.7.5:5c02a39a0b, Oct 14 2019, 18:49:57)
[Clang 6.0 (clang-600.0.57)] on darwin
Type "help", "copyright",
上游kafka topic消息带有一个用户类型字段,现在想根据不同用户类型将数据发到不同topic(为了扩展不想写死有哪些类型) ,请问flink
sql支持动态创建sink table吗?
我们使用Flink运行实时作业在Kubernetes,发现作业实际使用的CPU资源远远小于作业请求量,但是将作业请求量降低后发现作业启动不了。请问这是个案还是正常情况?
例如,我们一个作业请求了0.5个cpu,但实际使用量只有0.09左右,修改请求为0.2个cpu,作业启动不了。
现在整个k8s集群有96个cpu,请求了86个cpu,实际使用只有7.5个cpu左右,这也相差太大了,有什么办法可以解决吗?
今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time
window,问一下官方是否打算sql支持count window呢?
如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢!
为了监控TM OOM情况发生,我们在启动作业的时候添加了如下参数
-Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/oom.bin"
想在OOM发生的时候能生成HeapDumpFile,以便事后分析。
但是因为OOM时TM所在的pod会被销毁,因此想挂载一个网络盘持久化HeapDumpFile。
请问Flink on native k8s要如何自定义挂载盘呢?使用的Flink版本是1.12.5
redis和mongodb经常在工作中用到,但Flink官方一直没有提供这两个标准连接器,想问一下什么时候能正式release方便大家使用呢?
ps: behair库已经很久没更新了,对应的flink版本太低。
项目用到了下面依赖,使用flink-shaded-hadoop-2-uber因为使用到了YarnExecutor提交作业
org.apache.flink
flink-table-planner-blink_2.12
1.12.1
org.apache.flink
flink-shaded-hadoop-2-uber
2.8.3-10.0
如果只是数据同步作业,例如从kafka消费将数据存入下游db,这种弱“状态”作业能跨版本兼容么?
在 2021-08-11 16:54:56,"Leonard Xu" 写道:
>这里的SQL是指DDL还是DML,通常 DML都是兼容的,且一般不会有不兼容的升级,
>DDL 语法 各家 SQL 方言都有自己的语法,这个比较灵活,FLINK SQL 的DDL 各个版本稍有不同,但 Flink SQL 新版本都是兼容老的
>DDL的,
>只是新版本上的DDL语法如果提供了更丰富的功能,那么老版本的DDL则不能提供 。
>
源表三个字段 name, color, ts
按时间窗口聚合后想根据name group by取colors数组
create table source_table (
name STRING,
color STRING,
ts TIMESTAMP,
WATERMARK ts for ts
)
create table sink_table (
name STRING,
colors ARRAY
)
1. 请问这个select语句要怎么写?
select name, collect(color) as colors from
场景:实时统计用户访问日志数据,求一分钟内访问事件发生次数超过5次的用户,其不同source_ip出现次数最多前3个的事件
源表数据
user_name, source_ip, ts
张三, 100, 00:08
张三, 104, 00:12
张三, 100, 00:15
张三, 101, 00:35
张三, 100, 00:38
张三, 102, 00:40
张三, 102, 00:45
张三, 101, 00:47
张三, 100, 00:55
张三, 100, 01:15
李四, 200, 01:17
李四, 200, 01:19
李四, 200, 01:27
王五,
共有 352 项搜索結果,以下是第 1 - 100 matches
Mail list logo