Re: 在本地环境IDEA远程调试Flink报错

2021-07-09 Thread r pp
先编译正确后,再debug tangzhi8...@gmail.com 于2021年6月28日周一 下午3:02写道: > 目的:想在本地环境IDEA远程调试Flink > 步骤: > 1.这是Debug的配置项 > 2.报错堆栈信息: > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: Failed to execute job 'Streaming WordCount'. > at >

Re: Running Flink Dataset jobs Sequentially

2021-07-09 Thread Ken Krugler
FWIW I had to do something similar in the past. My solution was to… 1. Create a custom reader that added the source directory to the input data (so I had a Tuple2 2. Create a job that reads from all source directories, using HadoopInputFormat for text 3. Constrain the parallelism of this

Re: Subpar performance of temporal joins with RocksDB backend

2021-07-09 Thread Maciej Bryński
Hi Adrian, Could you share your state backend configuration ? Regards, Maciek pt., 9 lip 2021 o 19:09 Adrian Bednarz napisał(a): > > Hello, > > We are experimenting with lookup joins in Flink 1.13.0. Unfortunately, we > unexpectedly hit significant performance degradation when changing the

Re: local运行模式下不会生成checkpoint吗?

2021-07-09 Thread Yun Tang
Hi 只要enable了checkpoint,一定会生成checkpoint的,这与你的运行模式无关。可以检查一下日志,看看JM端是否正常触发了checkpoint 祝好 唐云 From: casel.chen Sent: Tuesday, June 29, 2021 9:55 To: user-zh@flink.apache.org Subject: local运行模式下不会生成checkpoint吗? 我在本地使用local运行模式运行flink sql,将数据从kafka写到mongodb,mongodb

Subpar performance of temporal joins with RocksDB backend

2021-07-09 Thread Adrian Bednarz
Hello, We are experimenting with lookup joins in Flink 1.13.0. Unfortunately, we unexpectedly hit significant performance degradation when changing the state backend to RocksDB. We performed tests with two tables: fact table TXN and dimension table CUSTOMER with the following schemas: TXN: |--

flink on native k8s要如何动态改变日志配置?

2021-07-09 Thread casel.chen
flink运行在原生k8s上,现在想要修改Root Logger Level和动态添加 Logger Name -> Logger Level,以及用户可以传入自定义的日志模板,目前有办法做到么?

Re: Ho to convert flat json to nested complex json in Flink sql?

2021-07-09 Thread Caizhi Weng
Hi! You can define your sink with the following schema: CREATE TABLE kafka_sink ( employee ROW ) WITH ( 'connector' = 'kafka', 'format' = 'json' // other properties... ); You can also insert into this sink with the following SQL: INSERT INTO kafka_sink SELECT ROW(id, name) FROM

Ho to convert flat json to nested complex json in Flink sql?

2021-07-09 Thread 1095193...@qq.com
Hi community, I'll receive json message from Kafka, convert flat json to nested json and send it back to Kafka. receive message from Kafka: {“id”:"001","name":"wang"} send message back to Kafka: {"employee":{“id”:"001","name":"wang"}} How to do it in Flink sql? 1095193...@qq.com

Kafka Consumer Retries Failing

2021-07-09 Thread Rahul Patwari
Hi, We have a Flink 1.11.1 Version streaming pipeline in production which reads from Kafka. Kafka Server version is 2.5.0 - confluent 5.5.0 Kafka Client Version is 2.4.1 - {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka version: 2.4.1","method":""}

自定义函数参数不能正确获取参数

2021-07-09 Thread Chenzhiyuan(HR)
我定义了一个kafka来源的table,sql查询时调了自定义函数, 但是发现参数不能被正确传递给自定义函数eval. 我用的flink版本是1.10.0. l json 的ddl如下: private static final String personKafkaTable = "CREATE TABLE hw_person_normal_t(\n" + " data ARRAY>,\n" + " key STRING,\n" + " operation STRING\n" + ") with (\n" +

Re: Job Recovery Time on TM Lost

2021-07-09 Thread Till Rohrmann
Gen is right with his explanation why the dead TM discovery can be faster with Flink < 1.12. Concerning flaky TaskManager connections: 2.1 I think the problem is that the receiving TM does not know the container ID of the sending TM. It only knows its address. But this is something one could