Hi, all
flink1.12 Blink planner有人遇到过这样的问题么:
下面是简化的逻辑
DataStream ds = .map(xxxRichMapFunction);
Table table = tableEnv.fromDataStream(ds);
tableEnv.toAppendStream(table.select(xxx).keyBy(xxx),
Row.class).addSink(xxxRichSinkFunction);
xxxRichMapFunction中对某个字段写了row.setField(index,
Hi, all
flink1.12 Blink planner有人遇到过这样的问题么:
下面是简化的逻辑
DataStream ds = .map(xxxRichMapFunction);
Table table = tableEnv.fromDataStream(ds);
tableEnv.toAppendStream(table.select(xxx).keyBy(xxx),
Row.class).addSink(xxxRichSinkFunction);
xxxRichMapFunction中对某个字段写了row.setField(index,
退订
qhp...@hotmail.com
Hi Ada,
sql-gateway之前没有维护起来,确实是一个遗憾。
最近我们也关注到大家对batch的兴趣越来越浓,sql-gateway还会继续维护。
btw,非常欢迎分享一下你们使用Flink替换Spark遇到的一些痛点,我们会逐渐去解决这些痛点
Best,
Godfrey
Ada Wong 于2022年1月12日周三 10:09写道:
>
> cc tsreaper and Godfrey He
>
> 文末丶 <809097...@qq.com.invalid> 于2022年1月10日周一 19:39写道:
>
> >
> >
cc tsreaper and Godfrey He
文末丶 <809097...@qq.com.invalid> 于2022年1月10日周一 19:39写道:
>
> 试下https://github.com/DataLinkDC/dlink 看看能不能满足你的需求
>
>
>
>
> --原始邮件--
> 发件人:
>
Hi, all
flink1.12 Blink planner有人遇到过这样的问题么:
下面是简化的逻辑
DataStream ds = .map(xxxRichMapFunction);
Table table = tableEnv.fromDataStream(ds);
tableEnv.toAppendStream(table.select(xxx).keyBy(xxx),
Row.class).addSink(xxxRichSinkFunction);
xxxRichMapFunction中对某个字段写了row.setField(index,
退订
目前这个更多还是一个经验值,和具体业务有关使用有关,建议任务运行后观察JM和TM的GC情况后再做调整
许友昌 <18243083...@163.com> 于2022年1月10日周一 15:18写道:
> 请问在启动flink 任务时,要如何确定该分配多少内存给 jobmanager,分配多少给 taskmanager,当我们指定 -ytm 1024
> 或 -ytm 2048 的依据是什么?
在生产环境中使用Flink是批示作业是OK的,不是很依赖Flink Remote Shuffle Service
Flink Remote Shuffle Service
主要解决大数据量Shuffle场景下的稳定性,目前Batch会将Shuffle的结果写本地磁盘,数量大的时候会容易将磁盘写满,稳定性也相对比较差
casel.chen 于2021年12月2日周四 08:26写道:
> GlobalStreamExchangeMode 这几种交换模式的不同和使用场景是什么?哪些适合流式作业,哪些适合批式作业?
> Flink Remote Shuffle
直接用的开源版本吗?还是公司内部有改动,原生的cp是固定频率,而很多公司离线计算都是整点触发的,为了减少延迟,会自定义在整点触发一次cp,开源目前没有这个feature
黄志高 于2021年12月1日周三 21:53写道:
> hi,各位大佬,咨询个问题
>
>
>
直接用的开源版本吗?还是公司内部有改动,原生的cp是固定频率,而很多公司离线计算都是整点触发的,为了减少延迟,会自定义在整点触发一次cp,开源目前没有这个feature
黄志高 于2021年12月2日周四 14:14写道:
> |
>
>
>
>
> 32684
> |
> COMPLETED
> | 8/8 | 13:52:36 | 13:52:38 | 2s | 126 KB | 0 B |
> | | 32683 |
> COMPLETED
> | 8/8 | 13:42:36 | 13:42:39 | 2s | 126 KB | 0 B |
> | | 32682 |
>
hi,
设置了parallelism=10 ,实际上是分配了 10 个 slot,flink 是会共享 slot 的,所以 sink 会有 10 线程。
在2022年1月11日 16:53,RS 写道:
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?
在 2022-01-11 11:10:41,"Caizhi Weng" 写道:
Hi!
可以设置 parallelism.default 为需要的并发数。
你可以通过环境变量或者flink config option的方式来指定kube config
export KUBECONFIG=/path/of/kube.config
或者
-Dkubernetes.config.file=/path/of/kube.config
具体的代码在这里[1]
[1].
可以的,提供一个思路,读取了kafka的数据后,直接输出原生的字节流后,接一层map算子做序列化相关工作,map算子的并发度你可以自己控制,这样kafka拉取就不会是瓶颈,大量的计算工作放到了map中,而map的并发度是可以自己控制的
Jeff 于2022年1月9日周日 19:45写道:
> 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?
hi
是 10 目前 source 还不支持单独设置并发度,但是 sink 是支持的,当然如果没有单独设置的话 sink 也是 10
Best
JasonLee
在2022年01月11日 16:52,RS 写道:
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?
在 2022-01-11 11:10:41,"Caizhi Weng" 写道:
Hi!
可以设置 parallelism.default
可以的,提供一个思路,读取了kafka的数据后,直接输出原生的字节流后,接一层map算子做序列化相关工作,map算子的并发度你可以自己控制,这样kafka拉取就不会是瓶颈,大量的计算工作放到了map中,而map的并发度是可以自己控制的
Caizhi Weng 于2022年1月11日周二 11:11写道:
> Hi!
>
> 可以设置 parallelism.default 为需要的并发数。
>
> Jeff 于2022年1月9日周日 19:44写道:
>
> > 当source为kafka时,最大并发度由kafka分区决定的,
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?
在 2022-01-11 11:10:41,"Caizhi Weng" 写道:
>Hi!
>
>可以设置 parallelism.default 为需要的并发数。
>
>Jeff 于2022年1月9日周日 19:44写道:
>
>> 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?
17 matches
Mail list logo