Re: flink key by 逻辑疑问
Hi. 会根据 key 的 hash 值分配到固定个数的 keygroup 之中的。简单来说,跟HashMap> 有点相似。金竹老师有一篇文章详细解释了[1]。 如果想看实现的话,可以从 KeyGroupStreamPartitioner 入手来看看 Table 层是怎么做的。 Best, Shengkai [1] https://developer.aliyun.com/article/667562 Peihui He 于2022年5月29日周日 11:55写道: > Hi, all > > 请教下大家,flink key by 后 使用process 来处理数据。现在有个问题: > 当key不限量的情况下,比如uuid,这种情况下,下游都会创建一个process 对象来处理数据不? > 如果这样的话,是不是没多久就会oom呢? > > 大家有熟悉这块相关flink 源码不?求指导,想自己观察下。 > > Best Regards! >
Re: flink key by 逻辑疑问
下游的process算子数取决于并行度,parallelism。而不是你的数据key有多少。 Peihui He 于2022年5月29日周日 11:55写道: > > Hi, all > > 请教下大家,flink key by 后 使用process 来处理数据。现在有个问题: > 当key不限量的情况下,比如uuid,这种情况下,下游都会创建一个process 对象来处理数据不? > 如果这样的话,是不是没多久就会oom呢? > > 大家有熟悉这块相关flink 源码不?求指导,想自己观察下。 > > Best Regards!
Re: Custom restart strategy
Hi. Maybe the metric reporter[1] is suitabe for your case. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/ unknown unknown 于2022年5月28日周六 12:49写道: > Thanks Shengkai! Unfortunately, this would require querying status for > each job continuously. Given very few pipelines experience failures and > they are far in-between, I am looking for a push based model vs polling. > > Thanks > AK > > On Thu, May 26, 2022 at 7:21 PM Shengkai Fang wrote: > >> Hi. >> >> I think you can use REST OPEN API to fetch the job status from the >> JM periodically to detect whether something happens. Currently REST OPEN >> API also supports to fetch the exception list for the specified job[2]. >> >> Best, >> Shengkai >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs >> [2] >> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-exceptions >> >> unknown unknown 于2022年5月26日周四 23:06写道: >> >>> Hello Users! >>> >>> I would like to notify an external endpoint when a streaming job has >>> a certain number of restarts. While I can use a service to continuously >>> *poll* Flink metrics and identify failing jobs, I am looking to >>> inverse the action and have the job notify. We have around ~50 streaming >>> jobs and it gets challenging querying on a continuous basis. >>> >>> Looking into [1], the intrusive way was to perform the action at [2] >>> (not tested though) Happy to hear suggestions and alternatives ? >>> >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#restart-strategies >>> >>> >>> [2] >>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FixedDelayRestartBackoffTimeStrategy.java#L68 >>> >>> >>> Thanks >>> AK. >>> >>
Re: GlobalCommitter in Flink's two-phase commit
Hi, 1. What are the general usage scenarios of GlobalCommitter? - GlobalCommitter is used for creating and committing an aggregated committable. It is part of a 2-phase-commit protocol. One use case is the compaction of small files. 2. Why should GlobalCommitter be removed in the new version of the api? - As FLIP-191 described, there are many different requirement from different downstream systems, e.g. Iceberg, Delta lake, Hive. One GlobalCommitter could not cover all of them. If you take a look at the SinkV1Adapter source code, you will see that StandardSinkTopologies#addGlobalCommitter, which is recommended to replace the usage of GlobalCommitter, is used to take care of the post commit topology. Best regards, Jing On Tue, May 24, 2022 at 9:11 AM di wu <676366...@qq.com> wrote: > Hello > Regarding the GlobalCommitter in Flink's two-phase commit, > I see it was introduced in FLIP-143, but it seems to have been removed > again in FLP-191 and marked as Deprecated in the source code. > I haven't found any relevant information about the use of GlobalCommitter. > > There are two questions I would like to ask: > 1. What are the general usage scenarios of GlobalCommitter? > 2. Why should GlobalCommitter be removed in the new version of the api? > > Thanks && Regards, > di.wu > >
Re: GlobalCommitter in Flink's two-phase commit
Hi, 1. What are the general usage scenarios of GlobalCommitter? - GlobalCommitter is used for creating and committing an aggregated committable. It is part of a 2-phase-commit protocol. One use case is the compaction of small files. 2. Why should GlobalCommitter be removed in the new version of the api? - As FLIP-191 described, there are many different requirement from different downstream systems, e.g. Iceberg, Delta lake, Hive. One GlobalCommitter could not cover all of them. If you take a look at the SinkV1Adapter source code, you will see that StandardSinkTopologies#addGlobalCommitter, which is recommended to replace the usage of GlobalCommitter, is used to take care of the post commit topology. Best regards, Jing On Tue, May 24, 2022 at 9:11 AM di wu <676366...@qq.com> wrote: > Hello > Regarding the GlobalCommitter in Flink's two-phase commit, > I see it was introduced in FLIP-143, but it seems to have been removed > again in FLP-191 and marked as Deprecated in the source code. > I haven't found any relevant information about the use of GlobalCommitter. > > There are two questions I would like to ask: > 1. What are the general usage scenarios of GlobalCommitter? > 2. Why should GlobalCommitter be removed in the new version of the api? > > Thanks && Regards, > di.wu > >