Hello,
I've been trying to configure the offset start position for a flink kafka
consumer. when there is no committed offset, to always start at the
beginning. It seems like the typical way to do this would be setting
auto.offset.reset=earliest however, I don't see that configuration property
in
Hi
As others said, state is different as checkpoint. a checkpoint is just
a **snapshot** of the state, and you can restore from the previous
checkpoint if the job crashed.
state is for stateful computation, and checkpoint is for
fault-tolerant[1]
The state keeps the information
Thanks Arvid,
I added static to ExecQueue and this did fix the problem. I tested without
static on RingBufferExec because it seems that if ExecQueue is static nested,
there should be no reference to the MyKeyedProcessFunction object as
RingBufferExec is an inner class of ExecQueue.
However,
Hi
容易异常退出是指 container 退出吗?可以看下 JM/TM log 是否有相应信息,如果没有,可以尝试从 yarn 侧看下日志为什么
container 退出了
Best,
Congxian
caozhen 于2020年10月12日周一 下午6:08写道:
>
> 可以发下 "分配完applicationid后,容器经常异常退出" 产生的错误日志吗?
>
> 或者排查下flink客户端中的错误日志,以及yarn-historyserver里的日志。
>
>
>
> Dream-底限 wrote
> > hi
> >
Hi Team,
I have tried to assign a dynamic prefix for file name, which contains
datetime components.
*The Problem is Job always takes initial datetime when job first starts and
never refreshes later. *
*How can I get dynamic current datetime in filename at sink time ?*
*.withPartPrefix
After careful examination, seems like it should be marked as @Internal since
this class is located in package
org.apache.flink.connector.jdbc.internal.connection.
Here is my PR related to this https://github.com/apache/flink/pull/13603 .
Thanks a lot!
Kenzyme Le
‐‐‐ Original Message
Hi,
I would like to know if class
[SimpleJdbcConnectionProvider](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/connection/SimpleJdbcConnectionProvider.java).java
should be marked as @Internal or
Hi,
You can use api to set configuration:
table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')
The flink-conf.yaml way will only take effect when submitted through flink
run, and the minicluster way(python xxx.py) will not take effect.
Best,
hi、
现在流表查询外部维表的时候,在有多张维表的情况下会多次查询外部系统,这就导致多次网络请求回传,社区后续会不会支持时态表子查询,就是根据指定的key查询外部系统的时候不再是一次查询一个指定的表,可以点查一个sql子表,这样网络io会小一些
Thanks for the feedback. I've created a JIRA here
https://issues.apache.org/jira/browse/FLINK-19589.
@Dan: This indeed would make it easier to set a lifetime property on
objects created by Flink, but actually if you want to apply it to all your
objects for a given bucket you can set bucket wide
Flink application using kafka topics as source and destination. Using
javaVersion = '1.11'
flinkVersion = '1.11.1'
scalaBinaryVersion ='2.11'
the application is primarily using Flink SQL apis. We have a StatementSet and
add sql inserts to that set using addInsertSql.
when there are more insert
We use the StreamingFileSink. An option to expire files after some time
period would certainly be welcome. (I could probably figure out a way to do
this from the S3 admin UI too though)
On Sat, Oct 10, 2020 at 10:45 PM Padarn Wilson wrote:
> Hi Flink Users,
>
> We need to expose some additional
Hi Yun,
4) Yes, the interaction is not trivial and also I have not completely
thought it through. But in general, I'm currently at the point where I
think that we also need non-checkpoint related events in unaligned
checkpoints. So just keep that in mind, that we might converge anyhow at
this
Is there a way for us to change the module (in a reasonable way) that
would allow users to continue using it?
Is it an API problem, or one of semantics?
On 10/12/2020 4:57 PM, Kostas Kloudas wrote:
Hi Chesnay,
Unfortunately not from what I can see in the code.
This is the reason why I am
Hi mates !
I'm very new at pyflink and trying to register a custom UDF function using
python API.
Currently I faced an issue in both server env and my local IDE environment.
When I'm trying to execute the example below I got an error message: *The
configured Task Off-Heap Memory 0 bytes is less
Hi Arvid,
Very thanks for the insightful comments! I added the responses for this issue
under the quota:
>> 1) You call the tasks that get the barriers injected leaf nodes, which would
>> make the > sinks the root nodes. That is very similar to how graphs in
>> relational algebra are labeled.
Hi,
你是想要自己做一个产品,将图显示在Web上?我们是只拿 DAG 中 json 值,然后前端进行处理的。
希望能帮助到你~
Best,
Hailong Wang
在 2020-10-12 18:15:36,"丁浩浩" <18579099...@163.com> 写道:
>我想要获取到flink webUI上面的DAG图,有什么办法能够获取到吗?
是的,具体代码逻辑在YarnClusterDescriptor#startAppMaster,会一直检测app state。
如果需要的话,可以自己增加超时判断(在旧版本中有这个逻辑,比如1.4.2)。
Best,
Hailong Wang
在 2020-10-12 17:17:44,"caozhen" 写道:
>
>是的,flink on yarn启动时申请的container资源不够,会等待,直到有资源。
>
>---
>
>
>guaishushu1...@163.com wrote
>> CliFrontend
我这边是为每个流任务单独指定了一个配置文件目录 不知道可否达到你的需求
发自我的iPhone
> 在 2020年10月12日,18:18,xiao cai 写道:
>
> Hi:
> 已知的设置metrics reporter的方式是在conf/flink-conf.yaml中,如果想要为每个任务分别设置不同的metrics
> reporter或者设置不同的参数,比如设置prometheus pushgateway的多个自定义的k=v,该如何设置呢?
>
>
> Best xiao.
Hi Chesnay,
Unfortunately not from what I can see in the code.
This is the reason why I am opening a discussion. I think that if we
supported backwards compatibility, this would have been an easier
process.
Kostas
On Mon, Oct 12, 2020 at 4:32 PM Chesnay Schepler wrote:
>
> Are older versions
Are older versions of the module compatible with 1.12+?
On 10/12/2020 4:30 PM, Kostas Kloudas wrote:
Hi all,
As the title suggests, this thread is to discuss the removal of the
flink-connector-filesystem module which contains (only) the deprecated
BucketingSink. The BucketingSin is deprecated
Hi all,
As the title suggests, this thread is to discuss the removal of the
flink-connector-filesystem module which contains (only) the deprecated
BucketingSink. The BucketingSin is deprecated since FLINK 1.9 [1] in
favor of the relatively recently introduced StreamingFileSink.
For the sake of a
Hi Dian, thx for your reply !
I was wondering to replace UDF on the fly from Flink, of course I'm pretty
sure that it's possible to implement update logic directly in Python, thx
for idea
Regards,
Rinat
пн, 12 окт. 2020 г. в 14:20, Dian Fu :
> Hi Rinat,
>
> Do you want to replace the UDFs with
Thanks for your replies.
When I use no state-relevant code in my program,the checkingpoint can be saved
and resumed.❶
So then why we needKeyed State/Operator State/Stateful Function?❷
"the operators are reset to the time of the respective checkpoint."
We already have met the requirement:"resume
1.flink任务各个算子的并行度一般怎么设计?例如map并行度设为多少,source并行度设为多少,这个有没有一个算法。
2.taskmanager的slot用完才会分配task到下一个taskmanager,这个设计初衷是如下原因吗?
摘自官网:
通过调整 slot 的数量,用户可以决定 subtasks 的隔离方式。每个 TaskManager 有一个 slot 意味着每组 task 在一个单独的
JVM 中运行(例如,在一个单独的容器中启动)。拥有多个 slots 意味着多个 subtasks 共享同一个 JVM。 Tasks 在同一个 JVM 中共享
TCP
我理解:
对于print,map 等无状态操作,不存储数据。
对于window 这种有状态操作,只存储窗口内的数据。
对于groupby 这种有状态操作,随着key越多,存储的数据越多,默认不清理,可以配置清理策略。
---
我的数据是接的kafka数据源,接到数据后注册成表,我想知道通过这种方式创建的表,表里的数据会一直追加吗?
是否会一直存在导致占用内存越来越大的问题???如何清理过期数据???
注册表代码如下:
//获取订单回调kafka数据
DataStreamSource
--
Sent from:
我理解:
对于print,map 等无状态操作,不存储数据。
对于window 这种有状态操作,只存储窗口内的数据。
对于groupby 这种有状态操作,随着key越多,存储的数据越多,默认不清理,可以配置清理策略。
---
我的数据是接的kafka数据源,接到数据后注册成表,我想知道通过这种方式创建的表,表里的数据会一直追加吗?
是否会一直存在导致占用内存越来越大的问题???如何清理过期数据???
注册表代码如下:
//获取订单回调kafka数据
DataStreamSource
--
Sent from:
Hi Rinat,
Do you want to replace the UDFs with new ones on the fly or just want to update
the model which could be seen as instance variables inside the UDF?
For the former case, it's not supported AFAIK.
For the latter case, I think you could just update the model in the UDF
periodically or
我想要获取到flink webUI上面的DAG图,有什么办法能够获取到吗?
每个job启动时候单独读取配置就可以了吧
| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|
签名由 网易邮箱大师 定制
在2020年10月12日 18:17,xiao cai 写道:
Hi:
已知的设置metrics reporter的方式是在conf/flink-conf.yaml中,如果想要为每个任务分别设置不同的metrics
reporter或者设置不同的参数,比如设置prometheus pushgateway的多个自定义的k=v,该如何设置呢?
Best xiao.
Hi Austin,
your explanation for the KeyedProcessFunction implementation sounds good
to me. Using the time and state primitives for this task will make the
implementation more explicit but also more readable.
Let me know if you could solve your use case.
Regards,
Timo
On 09.10.20 17:27,
可以试下这个方式能不能生效:在启动每个作业时,通过-D k=v 参数来设置这个作业的metrics。
---
xiao cai wrote
> Hi:
> 已知的设置metrics reporter的方式是在conf/flink-conf.yaml中,如果想要为每个任务分别设置不同的metrics
> reporter或者设置不同的参数,比如设置prometheus pushgateway的多个自定义的k=v,该如何设置呢?
>
>
> Best xiao.
--
Sent from:
Hi:
已知的设置metrics reporter的方式是在conf/flink-conf.yaml中,如果想要为每个任务分别设置不同的metrics
reporter或者设置不同的参数,比如设置prometheus pushgateway的多个自定义的k=v,该如何设置呢?
Best xiao.
是的,flink on yarn启动时申请的container资源不够,会等待,直到有资源。
---
guaishushu1...@163.com wrote
> CliFrontend 向yarn上提交任务会因为资源不足等原因,导致任务提交进程一直卡着,直到有资源释放为止?
>
>
> guaishushu1103@
--
Sent from: http://apache-flink.147419.n8.nabble.com/
可以发下 "分配完applicationid后,容器经常异常退出" 产生的错误日志吗?
或者排查下flink客户端中的错误日志,以及yarn-historyserver里的日志。
Dream-底限 wrote
> hi
> 我正在使用flink1.11.1 on
> yarn以分离模式运行任务,但在任务提交的时候,任务在分配完applicationid后,容器经常异常退出,先前以为是yarn环境问题,但是在两个集群测都有遇到这种情况,请问这是一个已知的bug吗
--
Sent from:
要不换个kafka的topic sink测试一下。。我觉得可能是kafka那头的问题,新手只能这样子猜一下。。
-邮件原件-
发件人: Yang Peng [mailto:yangpengklf...@gmail.com]
发送时间: 2020年9月30日 星期三 18:00
收件人: user-zh
主题: Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒
感谢回复,这个任务重启了之后看不到这个in/out指标数据, 我们能查到这个任务依赖的redis的连接查询次数也降低了,好像是任务假死一样
"认证的kafka是BBB.keytab" 这个是怎么设置的呢?是自己实现的kafkaSink嘛?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi Arvid, thx for your reply.
We are already using the approach with control streams to propagate
business rules through our data-pipeline.
Because all our models are powered by Python, I'm going to use Table API
and register UDF functions, where each UDF is a separate model.
So my question is
Hi Jeff
Sorry for the late reply. You can only restore the checkpoint in which
there is a _metadata in the chk-xxx directory, if there is not _metadata in
the chk-xxx directory, that means the chk-xxx is not complete, you can't
restore from it.
Best,
Congxian
Jeffrey Martin 于2020年9月15日周二
那这样的话,用inner join是不是可以,保证订单表中的员工都在员工维表里,就能统计到所有员工在今天产生的所有订单量。
---
夜思流年梦 wrote
> 是这样子的,比如要统计所有员工的今天的订单数量,如果是订单表left join 员工表的话,那么今天没有订单数量的就无法出现在结果表集合中;
> 把员工表放在左边left join 订单表的话那么就是所有员工的今天订单数量都会 出现
>
>
>
>
>
> 在 2020-10-12 15:17:07,"caozhen"
> caozhen1937@
> 写道:
>>
>>我理解这个场景下
Hi
感谢回复,我去看下
| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制
在2020年10月12日 17:09,Congxian Qiu 写道:
Hi
从错误日志看,应该是 filesystem 相关的配置(或者 jar 包)有问题,可以参考下这个邮件列表[1]看看能否解决你的问题
[1]
http://apache-flink.147419.n8.nabble.com/Flink-1-11-1-on-k8s-hadoop-td5779.html#a5834
Best,
Congxian
Hi Superainbower,
could you share the complete logs with us? They contain which Flink version
you are using and also the classpath you are starting the JVM with. Have
you tried whether the same problem occurs with the latest Flink version?
Cheers,
Till
On Mon, Oct 12, 2020 at 10:32 AM
Hi
从错误日志看,应该是 filesystem 相关的配置(或者 jar 包)有问题,可以参考下这个邮件列表[1]看看能否解决你的问题
[1]
http://apache-flink.147419.n8.nabble.com/Flink-1-11-1-on-k8s-hadoop-td5779.html#a5834
Best,
Congxian
superainbower 于2020年9月30日周三 下午3:04写道:
> 补充一下,我的错误日志
> Caused by:
Hi, @Storm 请问你用的是 flink 是哪个版本,然后栈是什么呢?可以把相关性信息回复到这里,可以一起看看是啥问题
Best,
Congxian
大森林 于2020年10月10日周六 下午1:05写道:
> 我这边是老版本的jdk8,和jdk261没啥关系的
>
>
>
>
> --原始邮件--
> 发件人:
> "user-zh"
>
你好,我最开始也考虑用双流join,但是双流join
就会碰到一个问题,就是结果集只会包含今天有订单的员工数据,那么没有订单的员工数据是不会体现到结果集的。主要是需要所有员工今天的订单数量;
在 2020-10-12 15:37:51,"Jark Wu" 写道:
>我理解楼主的场景不是 temporal join 的场景,而是双流 join
>的场景,因为任何一条流的变化,都希望触发对结果的更新,所以讲员工作为右边维度表是不行的。
>
>如果是我理解的这样的话,你可以用 flink-cdc-connectors [1] 去对接员工和订单两个
Hi community,
I have uploaded the log files of JobManager and TaskManager-1-1 (one of the
50 TaskManagers) with DEBUG log level and default Flink configuration, and
it clearly shows that TaskManager failed to register with JobManager after
10 attempts.
Here is the link:
JobManager:
Hi Padarn,
sounds like a good addition to me. We could wait for more feedback or you
could start immedately.
The next step would be to create a JIRA and get it assigned to you.
Looking forward to your contribution
Arvid
On Sun, Oct 11, 2020 at 7:45 AM Padarn Wilson wrote:
> Hi Flink Users,
是这样子的,比如要统计所有员工的今天的订单数量,如果是订单表left join 员工表的话,那么今天没有订单数量的就无法出现在结果表集合中;
把员工表放在左边left join 订单表的话那么就是所有员工的今天订单数量都会 出现
在 2020-10-12 15:17:07,"caozhen" 写道:
>
>我理解这个场景下 员工维表在右边没啥问题。
>
>join过程中需要去员工维表拿哪些字段?
>
>
>
>夜思流年梦 wrote
>> 现在有一个场景: 一个员工维表,一个订单表(监听mysql
Hi Rinat,
Which API are you using? If you use datastream API, the common way to
simulate side inputs (which is what you need) is to use a broadcast. There
is an example on SO [1].
[1]
Hi Till,
Could u tell me how to configure HDFS as statebackend when I deploy flink on
k8s?
I try to add the following to flink-conf.yaml
state.backend: rocksdb
state.checkpoints.dir: hdfs://slave2:8020/flink/checkpoints
state.savepoints.dir: hdfs://slave2:8020/flink/savepoints
Hi community,
Recently we have noticed a strange behavior for Flink jobs on Kubernetes
per-job mode: when the parallelism increases, the time it takes for the
TaskManagers to register with *JobManager *becomes abnormally long (for a
task with parallelism of 50, it could take 60 ~ 120 seconds or
Hi Yun,
Thank you for starting the discussion. This will solve one of the
long-standing issues [1] that confuse users. I'm also a big fan of option
3. It is also a bit closer to Chandy-Lamport again.
A couple of comments:
1) You call the tasks that get the barriers injected leaf nodes, which
我理解楼主的场景不是 temporal join 的场景,而是双流 join
的场景,因为任何一条流的变化,都希望触发对结果的更新,所以讲员工作为右边维度表是不行的。
如果是我理解的这样的话,你可以用 flink-cdc-connectors [1] 去对接员工和订单两个 binlog 流,然后直接
join,然后聚合订单数。伪代码如下:
create table users (
user_id bigint,
...
) with (
connector = mysql-cdc
...
);
create table orders (
order_id
看错误提示是没权限读取core-site.xml,有没有检查core-site.xml的文件权限
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我理解这个场景下 员工维表在右边没啥问题。
join过程中需要去员工维表拿哪些字段?
夜思流年梦 wrote
> 现在有一个场景: 一个员工维表,一个订单表(监听mysql binlog的一个流表),想实时计算出所有员工的订单数;
> 目前flink-sql 支持Join Temporal Tables ,但是官方文档上是这么说的:仅支持带有处理时间的 temporal
> tables 的 inner 和 left join。
> 而这个场景必须是维表在左边,但实际情况是维表在左边无法进行left join :会报错:ClassCastException:
>
hi leiyanrui,
我明白了,非常感谢你!!!
在2020年10月12日 15:05,leiyanrui<1150693...@qq.com> 写道:
进一步KeyedProcessFunction的处理是按照window的end时间,这样就会只有一个key进而将聚合后的数据再次聚合
--
Sent from: http://apache-flink.147419.n8.nabble.com/
进一步KeyedProcessFunction的处理是按照window的end时间,这样就会只有一个key进而将聚合后的数据再次聚合
--
Sent from: http://apache-flink.147419.n8.nabble.com/
No worries :)
Thank you~
Xintong Song
On Mon, Oct 12, 2020 at 2:48 PM Paul Lam wrote:
> Sorry for the misspelled name, Xintong
>
> Best,
> Paul Lam
>
> 2020年10月12日 14:46,Paul Lam 写道:
>
> Hi Xingtong,
>
> Thanks a lot for the pointer!
>
> It’s good to see there would be a new IO executor
HI leiyanrui,
你说的没有错,我改后的代码确实是对应了每种behavior的pv[捂脸],非常谢谢你的热心解答!
出于对Flink的理解不太熟悉,我还想再请教一个问题,为什么原demo还需要再对聚合后的窗口数据再做一次KeyedProcessFunction的处理?(图1)因为我想在聚合函数这一步就已经可以拿到整个窗口的pv数据了。
在2020年10月12日 14:36,leiyanrui<1150693...@qq.com> 写道:
keyby(_behavior) 你看到的四个结果应该是每种behavior的pv 不是整个窗口的pv
--
Sent from:
Hi 大森林,
You can always resume from checkpoints independent of the usage of keyed or
non-keyed state of operators.
1 checkpoint contains the state of all operators at a given point in time.
Each operator may have keyed state, raw state, or non-keyed state.
As long as you are not changing the
Sorry for the misspelled name, Xintong
Best,
Paul Lam
> 2020年10月12日 14:46,Paul Lam 写道:
>
> Hi Xingtong,
>
> Thanks a lot for the pointer!
>
> It’s good to see there would be a new IO executor to take care of the TM
> contexts. Looking forward to the 1.12 release!
>
> Best,
> Paul Lam
>
>>
Hi Xingtong,
Thanks a lot for the pointer!
It’s good to see there would be a new IO executor to take care of the TM
contexts. Looking forward to the 1.12 release!
Best,
Paul Lam
> 2020年10月12日 14:18,Xintong Song 写道:
>
> Hi Paul,
>
> Thanks for reporting this.
>
> Indeed, Flink's RM
Hi Xingbo ! Thx a lot for such a detailed reply, it is very useful.
пн, 12 окт. 2020 г. в 09:32, Xingbo Huang :
> Hi,
> I will do my best to provide pyflink related content, I hope it helps you.
>
> >>> each udf function is a separate process, that is managed by Beam (but
> I'm not sure I got
Hi Vijay,
If you implement the SinkFunction yourself, you can share the
OkHttpClient.Builder across all instances in the same taskmanager by using
a static field and initializing it only once (ideally in
RichSinkFunction#open).
On Tue, Oct 6, 2020 at 9:37 AM Aljoscha Krettek wrote:
> Hi,
>
>
FYI, I just created FLINK-19568 for tracking this issue.
Thank you~
Xintong Song
[1] https://issues.apache.org/jira/browse/FLINK-19568
On Mon, Oct 12, 2020 at 2:18 PM Xintong Song wrote:
> Hi Paul,
>
> Thanks for reporting this.
>
> Indeed, Flink's RM currently performs several HDFS
keyby(_behavior) 你看到的四个结果应该是每种behavior的pv 不是整个窗口的pv
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi Omkar,
I don't see anything suspicious in regards to how Flink handles
checkpointing; it simply took longer than 10m (configured checkpointing
timeout) to checkpoint.
The usual reason for long checkpointing times is backpressure. And indeed
looking at your thread dump, I see that you have a
Hi,
I will do my best to provide pyflink related content, I hope it helps you.
>>> each udf function is a separate process, that is managed by Beam (but
I'm not sure I got it right).
Strictly speaking, it is not true that every UDF is in a different python
process. For example, the two python
HI Sysuke,
在2020年10月12日 14:14,Lee Sysuke 写道:
Hi , 可以粘一下aggregateFuntion和ProcessFunction的代码吗
Natasha <13631230...@163.com> 于2020年10月12日周一 下午2:11写道:
HI ALL,
刚入门Flink的我最近从github上找了几个分析用户行为的Demo,想以此来入门Flink。
1. 但是有一个问题我一直想不通:(图1)
Hi Paul,
Thanks for reporting this.
Indeed, Flink's RM currently performs several HDFS operations in the rpc
main thread when preparing the TM context, which may block the main thread
when HDFS is slow.
Unfortunately, I don't see any out-of-box approach that fixes the problem
at the moment,
Hi , 可以粘一下aggregateFuntion和ProcessFunction的代码吗
Natasha <13631230...@163.com> 于2020年10月12日周一 下午2:11写道:
>
> HI ALL,
> 刚入门Flink的我最近从github上找了几个分析用户行为的Demo,想以此来入门Flink。
>1. 但是有一个问题我一直想不通:(图1)
>
> 如图,设置用户访问时间为EventTime;我设想的是,如果我设置一小时的滚动时间,那么按道理我应该得到的结果是,
> *在这一小时内访问的pv总数都应该返回给我*
>
HI ALL,
刚入门Flink的我最近从github上找了几个分析用户行为的Demo,想以此来入门Flink。
1. 但是有一个问题我一直想不通:(图1)
如图,设置用户访问时间为EventTime;我设想的是,如果我设置一小时的滚动时间,那么按道理我应该得到的结果是,在这一小时内访问的pv总数都应该返回给我,但是为什么console打出来的却是四个相同的时间戳但是却是四个分散的41890,992,1474,2539的pv数量?(图2)
2.
72 matches
Mail list logo