Re: How to mount PVC volumes using Flink Native Kubernetes ?

2021-09-17 Thread Yang Wang
Thanks Tao for providing your internal use case. I have create a ticket for this feature[1]. [1]. https://issues.apache.org/jira/browse/FLINK-24332 Best, Yang tao xiao 于2021年9月11日周六 上午10:18写道: > Thanks David for the tips. We have been running Flink with no performance > degradation observed

Re: RocksDB state not cleaned up

2021-09-17 Thread tao xiao
Thanks for the feedback! However TTL already proves that the state cannot be cleaned up on time due to too many levels built up in RocksDB. Hi @Yun Tang do you have any suggestions to tune RocksDB to accelerate the compaction progress? On Fri, Sep 17, 2021 at 8:01 PM David Morávek wrote: >

Re: Built-in functions to manipulate MULTISET type

2021-09-17 Thread JING ZHANG
Hi Kai, AFAIK, there is no built-in function to extract the keys in MULTISET to be an ARRAY. Define a UTF is a good solution. Best, JING ZHANG Kai Fu 于2021年9月18日周六 上午7:35写道: > Hi team, > > We want to know if

Re: Invalid flink-config keeps going and ignores bad config values

2021-09-17 Thread Yangze Guo
AFAIK there is not. Flink will just skip the invalid lines. Best, Yangze Guo On Sat, Sep 18, 2021 at 7:00 AM Dan Hill wrote: > > Hi. I noticed my flink-config.yaml had an error in it. I assumed a bad > config would stop Flink from running (to catch errors earlier). Is there a > way I can

HOP窗口较短导致checkpoint失败

2021-09-17 Thread xiaohui zhang
FLink:1.12.1 源: kafka create table dev_log ( devid, ip, op_ts ) with ( connector = kafka ) sink: Hbase connect 2.2 目前用flink sql的hop window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。 执行SQL如下 insert into h_table select devid as rowkey row(hop_end, ip_cnt) from ( select devid,

flink sql是否支持动态创建sink table?

2021-09-17 Thread casel.chen
上游kafka topic消息带有一个用户类型字段,现在想根据不同用户类型将数据发到不同topic(为了扩展不想写死有哪些类型) ,请问flink sql支持动态创建sink table吗?

Built-in functions to manipulate MULTISET type

2021-09-17 Thread Kai Fu
Hi team, We want to know if there is any built-in function to extract the keys in MULTISET to be an ARRAY. There is no such function as far as we can find, except to define a simple wrapper UDF for that, please

Invalid flink-config keeps going and ignores bad config values

2021-09-17 Thread Dan Hill
Hi. I noticed my flink-config.yaml had an error in it. I assumed a bad config would stop Flink from running (to catch errors earlier). Is there a way I can enable a strict parsing mode so any Flink parsing issue causes Flink to fail? I don't see one when looking at the code. 2021-09-17

RE: hdfs lease issues on flink retry

2021-09-17 Thread Shah, Siddharth
Hi Matthias, Thanks for looking into the issue and creating a ticket. I am thinking of having a workaround until the issue is fixed. What if I create the attempt directories with a random int by patching HadoopOutputFormatBase’s open() method? Original: TaskAttemptID taskAttemptID =

Re: Building a flink connector

2021-09-17 Thread Martijn Visser
Hi Lars, We're actually working on creating a guide to help our users on how to create a connector. Perhaps it would be good to use your needs to see what we need to include in such a guide, so we can make that available to the community. Would be great if we can have a conversation on that

Re: Building a flink connector

2021-09-17 Thread Yuval Itzchakov
Hi Lars, We've built a custom connector for Snowflake (both source and sink). Feel free to reach out in private if you have any questions. On Fri, Sep 17, 2021, 14:33 Lars Skjærven wrote: > We're in need of a Google Bigtable flink connector. Do you have any tips > on how this could be done,

Re: Cleaning old incremental checkpoint files

2021-09-17 Thread Yun Tang
Hi Robin, You could use Checkpoints#loadCheckpointMetadata[1] to analysis the checkpoint meta data. For the problem of make checkpoint self-contained, you might be interested in the ticket [2] [1]

Re: Flink S3A failed to connect to service endpoint from IntelliJ IDE

2021-09-17 Thread Yun Gao
Hi James, For one thing, It looks to me that we should not configure the credential in pom.xml, instead, we might introduce a core-site.xml under the classpath and configured like fs.s3a.access.key fs.s3a.secret.key I tried with the

Re: RocksDB state not cleaned up

2021-09-17 Thread David Morávek
Cleaning up with timers should solve this. Both approaches have some advantages and disadvantages though. Timers: - No "side effects". - Can be set in event time. Deletes are regular tombstones that will get compacted later on. TTL: - Performance. This costs literally nothing compared to an

Exception by flink kafka

2021-09-17 Thread Ragini Manjaiah
HI, In what scenarios we hit with *java.lang.OutOfMemoryError: Java heap space while publishing to kafka . I hit with this exception and a resolution added property *.setProperty("security.protocol","SSL");in the flink application. Later I started encountering

Building a flink connector

2021-09-17 Thread Lars Skjærven
We're in need of a Google Bigtable flink connector. Do you have any tips on how this could be done, e.g. general guidelines on how to write a connector ? Thanks, Lars

Flink SQL是否支持Count Window函数?

2021-09-17 Thread casel.chen
今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time window,问一下官方是否打算sql支持count window呢? 如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢!

Re: Re: Savepoints with bootstraping a datastream function

2021-09-17 Thread Yun Gao
Hi Rakshit, I think FLIP-147 might still not be able to support this case, since for bounded jobs, it supports each task exit after a checkpoint to commit the remaining data, but it could not ensures all the tasks exit after the same checkpoint; for savepoint, it could not supporting taking a

Re: Savepoints with bootstraping a datastream function

2021-09-17 Thread Rakshit Ramesh
Hi Arvid. I went through the code, confluence and jira on FLIP-147. I couldn't determine if it's possible to manually trigger a savepoint/checkpoint as I couldn't find any javadoc apis for the same. Also, would the setting "ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH" still create a checkpoint if my

Re:Flink on native k8s如何自定义挂载盘?

2021-09-17 Thread 东东
升级到1.13用pod template吧,这之前的版本没有官方支持的方式 在 2021-09-17 16:43:53,"casel.chen" 写道: >为了监控TM OOM情况发生,我们在启动作业的时候添加了如下参数 >-Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError >-XX:HeapDumpPath=/var/log/oom.bin" >想在OOM发生的时候能生成HeapDumpFile,以便事后分析。 >但是因为OOM时TM所在的pod会被销毁,因此想挂载一个网络盘持久化HeapDumpFile。

Flink on native k8s如何自定义挂载盘?

2021-09-17 Thread casel.chen
为了监控TM OOM情况发生,我们在启动作业的时候添加了如下参数 -Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/oom.bin" 想在OOM发生的时候能生成HeapDumpFile,以便事后分析。 但是因为OOM时TM所在的pod会被销毁,因此想挂载一个网络盘持久化HeapDumpFile。 请问Flink on native k8s要如何自定义挂载盘呢?使用的Flink版本是1.12.5

Flink SQL官方何时能支持redis和mongodb连接器?

2021-09-17 Thread casel.chen
redis和mongodb经常在工作中用到,但Flink官方一直没有提供这两个标准连接器,想问一下什么时候能正式release方便大家使用呢? ps: behair库已经很久没更新了,对应的flink版本太低。

flink cdc data stream api sourceRecord解析

2021-09-17 Thread Fisher Xiang
Hi, RT,对于data stream 消费binlog得到的sourceRecord,是一些string类型的struct类型数据, 请问官方有什么好的办法去解析这些string类型的struct数据吗?使用反射? 目标是解析成Java对象。 附上数据: SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.01, pos=720, row=1, snapshot=true}}