退订

2022-11-15 文章 jack zhang

退订

2022-08-08 文章 jack zhang

如何按比例丢弃kafka中消费的数据

2022-02-25 文章 jack zhang
1、flink程序资源有限,kafka中数据比较多,想要按一定比例丢弃数据(或者其它策略),减轻flink 程序压力,有什么方法吗?

Re:Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-22 文章 jack
您好,jincheng老师,我已经验证了您提供的这种分开处理的逻辑,可以解决我的问题,非常感谢您的解惑 Best, Jack 在 2020-06-22 14:28:04,"jincheng sun" 写道: 您好,jack: Table API 不用 if/else 直接用类似逻辑即可: val t1 = table.filter('x > 2).groupBy(..) val t2 = table.filter('x <= 2).groupBy(..) t1.insert_into("sink1)

pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-18 文章 jack
使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中? 场景:使用pyflink通过filter进行条件过滤后插入到sink中, 比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中: { "logType":"syslog", "message":"sla;flkdsjf" } { "logType":"alarm", "message":"sla;flkdsjf" } t_env.from_path("source")\

Re:Re: pyflink连接elasticsearch5.4问题

2020-06-16 文章 jack
e('taxiidcnt') >> .key_delimiter("$")) \ 在 2020-06-16 15:38:28,"Dian Fu" 写道: >I guess it's because the ES version specified in the job is `6`, however, the >jar used is `5`. > >> 在 2020年6月16日,下午1:47,jack 写道: >> >>

pyflink连接elasticsearch5.4问题

2020-06-15 文章 jack
我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是 flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。 连接es的时候报错,findAndCreateTableSink failed。 是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。 Caused by Could not find a suitable factory for

Re: pyflink数据查询

2020-06-15 文章 jack
hi 感谢您的建议,我这边尝试一下自定义实现sink的方式。 Best, Jack 在 2020-06-15 18:08:15,"godfrey he" 写道: hi jack,jincheng Flink 1.11 支持直接将select的结果collect到本地,例如: CloseableIterator it = tEnv.executeSql("select ...").collect(); while(it.hasNext()) { it.next() } 但是 pyflink 还

Re:Re: pyflink数据查询

2020-06-15 文章 jack
感谢您的建议,目前在学习使用pyflink,使用pyflink做各种有趣的尝试,包括udf函数做日志解析等,也看过 目前官方文档对于pyflink的文档和例子还是偏少,遇到问题了还是需要向各位大牛们多多请教。 Best, Jack 在 2020-06-15 16:13:32,"jincheng sun" 写道: >你好 Jack, > >> pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果, >我这边可以通过开发web接口直接查询这个结果,不必去si

pyflink数据查询

2020-06-09 文章 jack
问题请教: 描述: pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。 flink能否实现这样的方式? 感谢

Re: pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 文章 jack
非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教 在 2020-06-01 20:50:53,"Xingbo Huang" 写道: Hi, 其实这个是CSV connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。 st_env.connect( Kafka() .version("0.11") .topic("logSink")

pyflink 使用udf函数 处理成json字符串发送到kafka的格式问题

2020-06-01 文章 jack
请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理, 数据输入: {"topic": "logSource", "message": "x=1,y=1,z=1"} 发送到kafka里面的数据结果如下: "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}" 又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。

Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

2020-06-01 文章 jack
您理解的是对的,我测试了下,好像pyflink的udf函数不太支持python的可变参数 在 2020-06-01 14:47:21,"Dian Fu" 写道: >你传的第二个参数是string,这样试一下? >select("drop_fields(message, array('x'))") > >不太确定我是否理解了你的问题(如果上面不行的话,建议你发一下exception) > >> 在 2020年6月1日,下午1:59,jack 写道: >> &g

Re:Re: pyflink Table Api 使用udf函数报错 Given parameters do not match any signature.

2020-05-31 文章 jack
是的,对应参数没有填写正确,感谢; 另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。 在 2020-06-01 11:01:34,"Dian Fu" 写道: >The input types should be as following: > >input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())] > >Regards, >Dian > >> 在 2020年6月1日,上午10:49,刘亚坤 写道: >> >>