Re: flink key by 逻辑疑问

2022-05-29 Thread Shengkai Fang
Hi.

会根据 key 的 hash 值分配到固定个数的 keygroup 之中的。简单来说,跟HashMap>
有点相似。金竹老师有一篇文章详细解释了[1]。

如果想看实现的话,可以从 KeyGroupStreamPartitioner 入手来看看 Table 层是怎么做的。

Best,
Shengkai

[1] https://developer.aliyun.com/article/667562

Peihui He  于2022年5月29日周日 11:55写道:

> Hi, all
>
> 请教下大家,flink key by 后 使用process 来处理数据。现在有个问题:
> 当key不限量的情况下,比如uuid,这种情况下,下游都会创建一个process 对象来处理数据不?
> 如果这样的话,是不是没多久就会oom呢?
>
> 大家有熟悉这块相关flink 源码不?求指导,想自己观察下。
>
> Best Regards!
>


Re: flink key by 逻辑疑问

2022-05-29 Thread yidan zhao
下游的process算子数取决于并行度,parallelism。而不是你的数据key有多少。

Peihui He  于2022年5月29日周日 11:55写道:
>
> Hi, all
>
> 请教下大家,flink key by 后 使用process 来处理数据。现在有个问题:
> 当key不限量的情况下,比如uuid,这种情况下,下游都会创建一个process 对象来处理数据不?
> 如果这样的话,是不是没多久就会oom呢?
>
> 大家有熟悉这块相关flink 源码不?求指导,想自己观察下。
>
> Best Regards!


Re: Custom restart strategy

2022-05-29 Thread Shengkai Fang
Hi.

Maybe the metric reporter[1] is suitabe for your case.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/

unknown unknown  于2022年5月28日周六 12:49写道:

> Thanks Shengkai! Unfortunately, this would require querying status for
> each job continuously. Given very few pipelines experience failures and
> they are far in-between, I am looking for a push based model vs polling.
>
> Thanks
> AK
>
> On Thu, May 26, 2022 at 7:21 PM Shengkai Fang  wrote:
>
>> Hi.
>>
>> I think you can use REST OPEN API to fetch the job status from the
>> JM periodically to detect whether something happens. Currently REST OPEN
>> API also supports to fetch the exception list for the specified job[2].
>>
>> Best,
>> Shengkai
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid-exceptions
>>
>> unknown unknown  于2022年5月26日周四 23:06写道:
>>
>>> Hello Users!
>>>
>>> I would like to notify an external endpoint when a streaming job has
>>> a certain number of restarts. While I can use a service to continuously
>>> *poll* Flink metrics and identify failing jobs, I am looking to
>>> inverse the action and have the job notify. We have around ~50 streaming
>>> jobs and it gets challenging querying on a continuous basis.
>>>
>>> Looking into [1], the intrusive way was to perform the action at [2]
>>> (not tested though) Happy to hear suggestions and alternatives ?
>>>
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/#restart-strategies
>>>
>>>
>>> [2]
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FixedDelayRestartBackoffTimeStrategy.java#L68
>>>
>>>
>>> Thanks
>>> AK.
>>>
>>


Re: GlobalCommitter in Flink's two-phase commit

2022-05-29 Thread Jing Ge
Hi,

1. What are the general usage scenarios of GlobalCommitter?
- GlobalCommitter is used for creating and committing an aggregated
committable. It is part of a 2-phase-commit protocol. One use case is the
compaction of small files.

2. Why should GlobalCommitter be removed in the new version of the api?
- As FLIP-191 described, there are many different requirement from
different downstream systems, e.g. Iceberg, Delta lake, Hive. One
GlobalCommitter could not cover all of them. If you take a look at the
SinkV1Adapter source code, you will see that
StandardSinkTopologies#addGlobalCommitter, which is recommended to replace
the usage of GlobalCommitter, is used to take care of the  post commit
topology.

Best regards,
Jing

On Tue, May 24, 2022 at 9:11 AM di wu <676366...@qq.com> wrote:

> Hello
> Regarding the GlobalCommitter in Flink's two-phase commit,
> I see it was introduced in FLIP-143, but it seems to have been removed
> again in FLP-191 and marked as Deprecated in the source code.
> I haven't found any relevant information about the use of GlobalCommitter.
>
> There are two questions I would like to ask:
> 1. What are the general usage scenarios of GlobalCommitter?
> 2. Why should GlobalCommitter be removed in the new version of the api?
>
> Thanks && Regards,
> di.wu
>
>


Re: GlobalCommitter in Flink's two-phase commit

2022-05-29 Thread Jing Ge
Hi,

1. What are the general usage scenarios of GlobalCommitter?
- GlobalCommitter is used for creating and committing an aggregated
committable. It is part of a 2-phase-commit protocol. One use case is the
compaction of small files.

2. Why should GlobalCommitter be removed in the new version of the api?
- As FLIP-191 described, there are many different requirement from
different downstream systems, e.g. Iceberg, Delta lake, Hive. One
GlobalCommitter could not cover all of them. If you take a look at the
SinkV1Adapter source code, you will see that
StandardSinkTopologies#addGlobalCommitter, which is recommended to replace
the usage of GlobalCommitter, is used to take care of the  post commit
topology.

Best regards,
Jing

On Tue, May 24, 2022 at 9:11 AM di wu <676366...@qq.com> wrote:

> Hello
> Regarding the GlobalCommitter in Flink's two-phase commit,
> I see it was introduced in FLIP-143, but it seems to have been removed
> again in FLP-191 and marked as Deprecated in the source code.
> I haven't found any relevant information about the use of GlobalCommitter.
>
> There are two questions I would like to ask:
> 1. What are the general usage scenarios of GlobalCommitter?
> 2. Why should GlobalCommitter be removed in the new version of the api?
>
> Thanks && Regards,
> di.wu
>
>