1、flink程序资源有限,kafka中数据比较多,想要按一定比例丢弃数据(或者其它策略),减轻flink 程序压力,有什么方法吗?
您好,jincheng老师,我已经验证了您提供的这种分开处理的逻辑,可以解决我的问题,非常感谢您的解惑
Best,
Jack
在 2020-06-22 14:28:04,"jincheng sun" 写道:
您好,jack:
Table API 不用 if/else 直接用类似逻辑即可:
val t1 = table.filter('x > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
场景:使用pyflink通过filter进行条件过滤后插入到sink中,
比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
{
"logType":"syslog",
"message":"sla;flkdsjf"
}
{
"logType":"alarm",
"message":"sla;flkdsjf"
}
t_env.from_path("source")\
e('taxiidcnt')
>> .key_delimiter("$")) \
在 2020-06-16 15:38:28,"Dian Fu" 写道:
>I guess it's because the ES version specified in the job is `6`, however, the
>jar used is `5`.
>
>> 在 2020年6月16日,下午1:47,jack 写道:
>>
>>
我这边使用的是pyflink连接es的一个例子,我这边使用的es为5.4.1的版本,pyflink为1.10.1,连接jar包我使用的是
flink-sql-connector-elasticsearch5_2.11-1.10.1.jar,kafka,json的连接包也下载了,连接kafka测试成功了。
连接es的时候报错,findAndCreateTableSink failed。
是不是es的连接jar包原因造成的?哪位遇到过类似问题还请指导一下,感谢。
Caused by Could not find a suitable factory for
hi
感谢您的建议,我这边尝试一下自定义实现sink的方式。
Best,
Jack
在 2020-06-15 18:08:15,"godfrey he" 写道:
hi jack,jincheng
Flink 1.11 支持直接将select的结果collect到本地,例如:
CloseableIterator it = tEnv.executeSql("select ...").collect();
while(it.hasNext()) {
it.next()
}
但是 pyflink 还
感谢您的建议,目前在学习使用pyflink,使用pyflink做各种有趣的尝试,包括udf函数做日志解析等,也看过
目前官方文档对于pyflink的文档和例子还是偏少,遇到问题了还是需要向各位大牛们多多请教。
Best,
Jack
在 2020-06-15 16:13:32,"jincheng sun" 写道:
>你好 Jack,
>
>> pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
>我这边可以通过开发web接口直接查询这个结果,不必去si
问题请教:
描述: pyflink 从source通过sql对数据进行查询聚合等操作
不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
flink能否实现这样的方式?
感谢
非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
在 2020-06-01 20:50:53,"Xingbo Huang" 写道:
Hi,
其实这个是CSV
connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
st_env.connect(
Kafka()
.version("0.11")
.topic("logSink")
请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,
数据输入:
{"topic": "logSource", "message": "x=1,y=1,z=1"}
发送到kafka里面的数据结果如下:
"{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"
又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。
您理解的是对的,我测试了下,好像pyflink的udf函数不太支持python的可变参数
在 2020-06-01 14:47:21,"Dian Fu" 写道:
>你传的第二个参数是string,这样试一下?
>select("drop_fields(message, array('x'))")
>
>不太确定我是否理解了你的问题(如果上面不行的话,建议你发一下exception)
>
>> 在 2020年6月1日,下午1:59,jack 写道:
>>
&g
是的,对应参数没有填写正确,感谢;
另外请教,udf函数是不是目前不支持python的可变参数,我使用可变参数依然会报错参数不对的问题。
在 2020-06-01 11:01:34,"Dian Fu" 写道:
>The input types should be as following:
>
>input_types=[DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())]
>
>Regards,
>Dian
>
>> 在 2020年6月1日,上午10:49,刘亚坤 写道:
>>
>>
14 matches
Mail list logo