Re: Re:re:Re: 回复:Flink SQL官方何时能支持redis和mongodb连接器?
Hi 用github账号登陆之后,可以使用添加package的方式[1]自行上传共享。 [1] https://flink-packages.org/new-package 祝好 唐云 From: casel.chen Sent: Thursday, September 23, 2021 12:40 To: user-zh@flink.apache.org Cc: myasuka Subject: Re:re:Re: 回复:Flink SQL官方何时能支持redis和mongodb连接器? >我司基于最新提供流批一体的接口,实现了mongodb的连接器,支持source和sink,实现了控制批量插入频率、控制缓存批的数据量和mongo文档对象和java对象转换,同时还可选择批量更新,并且使用的mongo最新异步驱动,后期我还会不断优化性能,看大佬能否推动一下,把这个连接器贡献给社区 问一下自己开发的连接器要怎么添加到 https://flink-packages.org/ 网站给大家搜索到?这位朋友能够将你们的连接器贡献上去呢? 在 2021-09-23 09:32:39,"2572805166" <2572805...@qq.com.INVALID> 写道: >我司基于最新提供流批一体的接口,实现了mongodb的连接器,支持source和sink,实现了控制批量插入频率、控制缓存批的数据量和mongo文档对象和java对象转换,同时还可选择批量更新,并且使用的mongo最新异步驱动,后期我还会不断优化性能,看大佬能否推动一下,把这个连接器贡献给社区 > > >-- 原始邮件 -- >发件人: "Yun Tang"; >发件时间: 2021-09-22 10:55 >收件人: "user-zh@flink.apache.org"; >主题: Re: 回复:Flink SQL官方何时能支持redis和mongodb连接器? > > > >Hi, > >其实目前Flink社区并不是那么欢迎新增官方支持的connector,主要原因就是社区的开发人员有限,没有精力维护太多的connector,尤其是一些connector的实现需要一定的相关背景,但很难保证review代码的开发人员具有相关背景,毕竟大家都需要为自己approve的代码负责。 >你可以在 flink-packages [1] 里面找一下,或者考虑自己实现并维护(基础实现应该是复杂度不高的)。 > > >[1] https://flink-packages.org/ > > >祝好 >唐云 > > >From: 黑色 >Sent: Saturday, September 18, 2021 17:17 >To: user-zh@flink.apache.org >Subject: 回复:Flink SQL官方何时能支持redis和mongodb连接器? > >这个可以自已定义一个,参考源码写一个,自己写出来的才是自己的,直接用别人的还是别人的 > > > > >-- 原始邮件 -- >发件人: > "user-zh" > >发送时间: 2021年9月17日(星期五) 下午4:39 >收件人: "user-zh@flink.apache.org" >主题: Flink SQL官方何时能支持redis和mongodb连接器? > > > >redis和mongodb经常在工作中用到,但Flink官方一直没有提供这两个标准连接器,想问一下什么时候能正式release方便大家使用呢? >ps: behair库已经很久没更新了,对应的flink版本太低。
?????? flink sql????????????????sink table?
iPhone -- -- ??: JasonLee <17610775...@163.com> : 2021??9??23?? 21:57 ??: user-zh@flink.apache.org https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like casel.chen ??2021??9??18?? 8:27?? kafka topic??topic) ??flink sqlsink table
?????? flink sql????????????????sink table?
Hi , SQL SQL ??,?? Best JasonLee ??2021??09??23?? 09:28 ?? sql??sql?? iPhone -- -- ??: 2572805166 <2572805...@qq.com.INVALID> : 2021??9??23?? 09:23 ??: user-zh https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like casel.chen ??2021??9??18?? 8:27?? kafka topic??topic) ??flink sqlsink table
回复:退订
Hi 退订应该发送到 user-zh-unsubscr...@flink.apache.org Best JasonLee 在2021年09月23日 15:20,wangjingen<13033709...@163.com> 写道: 退订
??????????
Hi ?? user-zh-unsubscr...@flink.apache.org Best JasonLee ??2021??09??23?? 21:22??Night_xing<1029681...@qq.com.INVALID> ??
回复:退订
Hi 退订应该发送到 user-zh-unsubscr...@flink.apache.org Best JasonLee 在2021年09月23日 15:26,金圣哲 写道: 退订
????
flink on native k8s 资源弹性扩容问题
目前生产上环境作业参数0.2(cpu),5G 平常增量跑的时候cpu占用率不到5%,上游数据全量初始化时经常会把CPU打满 想问下:flink能否做到弹性扩容?当pod的request cpu打满时自动增加cpu,当高峰期过后处于增量阶段时再收回部分pod资源?
Re: Flink SQL是否支持Count Window函数?
这个目前还不支持,但是可以基于TVF来实现,现在已经建了一个jira了: https://issues.apache.org/jira/browse/FLINK-24002 Caizhi Weng 于2021年9月22日周三 上午11:17写道: > Hi! > > 据我所知目前暂时没有增加 count window 的打算,以后可能会在最新的 Window TVF 里添加 count window tvf。 > > 不建议在 SQL 中自行实现 count window,因为 SQL 添加 window 较为复杂。但可以考虑先将 SQL 转为 > datastream,用 datastream 的 count window 之后再将 datastream 转回 SQL。 > > EnvironmentSettings settings = EnvironmentSettings.newInstance(). > inStreamingMode().build(); > StreamTableEnvironment tEnv = > StreamTableEnvironment.create( > StreamExecutionEnvironment.getExecutionEnvironment(), settings); > tEnv.executeSql( > "CREATE TABLE T ( a INT, b INT, key AS abs(a) % 3, val AS abs(b) % 3 ) WITH > ( 'connector' = 'datagen' )"); > Table table = tEnv.sqlQuery("SELECT key, val FROM T"); > DataStream dataStream = tEnv.toDataStream(table); > DataStream> summedStream = > dataStream > .keyBy(row -> (int) row.getField(0)) > .countWindow(100) > .apply( > (WindowFunction< > Row, > Tuple2, > Integer, > GlobalWindow>) > (key, window, input, out) -> { > int sum = 0; > for (Row row : input) { > Integer field = (Integer) row.getField(1); > if (field != null) { > sum += field; > } > } > out.collect(Tuple2.of(key, sum)); > }) > .returns( > new TupleTypeInfo<>( > BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)); > Table summedTable = tEnv.fromDataStream(summedStream); > tEnv.registerTable("S", summedTable); > tEnv.executeSql("SELECT f0, f1 FROM S").print(); > > casel.chen 于2021年9月17日周五 下午6:05写道: > > > 今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time > > window,问一下官方是否打算sql支持count window呢? > > 如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢! >
????????????????UDF????????????
??HBASE??kafkauserId??OK -- -- ??: "user-zh"
自定义函数UDF函数相关疑问
Hi,您好 我有个需求,具体逻辑是:用 Flink 读 Kafka Topic 的用户行为数据,关联 Hbase 中用户的详细信息后输出。有个优化点是:在关联 Hbase 时会先通过 Topic 中的 userid 查 Hbase,并先将Hbase中的用户详细信息保存到状态中。这样有个好处:假设状态过期时间是6小时,用户在6小时之内有行为将会先从状态中读取。具体伪代码如下: env.addSource(new FlinkKafkaConsumer011(...)) .map(message ->(userid,actions)) .keyby(userid) .process(new MyHbaseStateFunction) .print() 我想问的点是:如果用Flink Sql 实现上述功能的话,可能需要新增一个Flink UDF。但是类似上述场景,在 keyby之后,要获取同一个userid的详细信息,只需要读特定的task内的状态即可。如果用Flink UDF实现的话,我该如何将数据通过userid分组。
Re:Re: flink消费kafka过程中,发现我的CDH上的kafka异常,请大佬帮忙看看
大佬,我试过了,还是不行,暗地里一直反复生产消息。帮忙看看哈! kafka-console-consumer --bootstrap-server xx001:9092,xx002:9092,xx003:9092 --topic bigdata4 --group myConsumerGroup 生产者(发送1和2): 21/09/23 18:03:57 INFO utils.AppInfoParser: Kafka version: 2.2.1-cdh6.3.1 21/09/23 18:03:57 INFO utils.AppInfoParser: Kafka commitId: unknown >21/09/23 18:03:57 INFO clients.Metadata: Cluster ID: vKZ_bka0Qziw5bBCRR3Ftg 1 >2 > 消费者(接受到了无穷个1和2,除非把这个topic删除,否则一直发送,消费): 1 2 1 2 1 2 2 1 2 1 2 1 1 2 1 2 1 2 1 1 2 1 2 1 2 1 1 2 1 2 1 2 在 2021-09-23 18:24:37,"guozhi mang" 写道: >看不到你的图片内容,有尝试过更换消费者组吗? > >Geoff nie 于2021年9月23日周四 下午6:06写道: > >> 各位大佬好: >> >> 之前CDH集群被重新安装了,服务正常。但是在linux下打开生产者,打开消费者,再在生产者中发送一条消息,消费者一直反复消费消息,但是日志也不报异常。请大佬们帮忙看看,万分感谢! >> 生产者: >> 消费者: >> >> >> >>
Re: flink消费kafka过程中,发现我的CDH上的kafka异常,请大佬帮忙看看
看不到你的图片内容,有尝试过更换消费者组吗? Geoff nie 于2021年9月23日周四 下午6:06写道: > 各位大佬好: > > 之前CDH集群被重新安装了,服务正常。但是在linux下打开生产者,打开消费者,再在生产者中发送一条消息,消费者一直反复消费消息,但是日志也不报异常。请大佬们帮忙看看,万分感谢! > 生产者: > 消费者: > > > >
flink消费kafka过程中,发现我的CDH上的kafka异常,请大佬帮忙看看
各位大佬好: 之前CDH集群被重新安装了,服务正常。但是在linux下打开生产者,打开消费者,再在生产者中发送一条消息,消费者一直反复消费消息,但是日志也不报异常。请大佬们帮忙看看,万分感谢! 生产者: 消费者:
回复:Flink Session 模式Job日志区分
你不同job任务日志上做一个区别LOG_PREFIX private static final String LOG_PREFIX = "【WF事件组件下发缓存处理器】"; log.info("|prefix={} ☀️☀️☀️☀️ 进行订阅事件缓存处理开始|message={}|componentEvent={}|", LOG_PREFIX, message, componentEvent); 陈卓宇 -- 原始邮件 -- 发件人: "user-zh"
退订
退订
退订
退订