Default Flink S3 FileSource timeout due to large file listing

2023-09-25 文章 Eleanore Jin
Hello Flink Community,
Flink Version: 1.16.1, Zookeeper for HA.
My Flink Applications reads raw parquet files hosted in S3, applies
transformations and re-writes them to S3, under a different location.
Below is my code to read from parquets from S3:
```
final Configuration configuration = new Configuration();
configuration.set("fs.s3.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem");
final ParquetColumnarRowInputFormat format =
  new ParquetColumnarRowInputFormat<>(
configuration,
,
InternalTypeInfo.of(),
100,
true,
true
  );
final FileSource source = FileSource
  .forBulkFileFormat(format, new Path("s3/"))
  .build();
 stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"parquet-source");
```
I noticed the following:
1. My S3 directory, "s3//", can have more than 1M+ files. The
parquets in this directory are partitioned by date and time. This makes the
folder structure of this directory deterministic. e.g
"s3//partiton_column_a/partition_columb_b/2023-09-25--13/{1,2...N}.parquet".
I believe the Flink Default FileSource is doing a list on this large
directory and gets stuck waiting for the operation to complete. The Akka
connect timeout error messages in the Task Manager logs support this.
Additionally, the job runs successfully when I restrict the input to a
subfolder, looking at only an hour's data, based on the mentioned
partitioning scheme. In my local machine, I also tried using S3 CLI to
recursively list this directory and the operation did not complete in 1
hour.

*Is this behavior expected based on Flink's S3 source implementation? *Looking
at the docs
,
one way to solve this is to implement the Split Enumerator by incrementally
processing the subfolders in "s3//", based on the mentioned
partitioning scheme.

*Are there any other approaches available?*
2. Following the code above, when I deserialize records from S3 I get
records of type BinaryRowData
.
However, when I use the same code in Unit Testing, with
MiniClusterWithClientResource
,
to read from a local parquet file (not S3), I get records of type
GenericRowData

.

*What is the reason for this discrepancy and is it possible to force
deserialization to output type GenericRowData? *Currently, I have written
code to convert BinaryRowData to GenericRowData as our downstream
ProcessFunctions expect this type.
I*s there a better solution to transform BinaryRowData to GenericRowData?*

Thanks!
Eleanore


Re: Container is running beyond physical memory limits

2021-02-20 文章 Eleanore Jin
Hi
这是我之前看到一篇关于OOM KILL 的分析文章,不知道对你有没有用

http://www.whitewood.me/2021/01/02/%E8%AF%A6%E8%A7%A3-Flink-%E5%AE%B9%E5%99%A8%E5%8C%96%E7%8E%AF%E5%A2%83%E4%B8%8B%E7%9A%84-OOM-Killed/

On Thu, Feb 18, 2021 at 9:01 AM lian  wrote:

> 各位大佬好:
> 1. 背景:使用Flink
> SQL实现回撤流的功能,使用了Last_Value,第二层聚合进行sum求和计算,主要是依靠回撤流的方式,来实现对已经下发的数据进行减法的操作。
>   实现的功能和菜鸟如下基本是一致的。
>   https://developer.aliyun.com/article/457392
> 2. 版本及调优:flink
> 1.10.1,资源单个slot从2-6g,借助了rocksdb的状态后端来存储状态值,对flink的managed memory进行了反复的调优,
>调整过overhead内存,最小值设为1g,最大值设为2g
>blockcache大小,由默认值8mb设置为128mb
>block size大小,由默认值4kb设置为32kb
>flush线程数,由默认值1设置为4
>writebuffer,由默认值0.5调整为0.25
>采用了batch size,两阶段聚合参数
>增量checkpoint
>预分配内存设置为false
> 3. 状态大小: ck在100mb - 25g浮动,savepoint达到了120g
> 4. 存在的问题:
>  1. 内存出现不足,container被kill掉
>  2. ck时,状态大的很大,小的很小,不知道这个是什么原因?
> 5. 看了很多关于内存oom被kill的文章,调整了overhead参数,增大tm的内存来扩大managed
> memory,调整rocksdb的参数等一系列的参数,目前都还是运行一段时间后出现container 被kill。
> 6. 个人预留的问题:flink 的rocksdb的mertic的参数有添加,但是个人不是很清楚,如何进行打印和监控,如果后续的flink
> 的ui界面能够加上对这一块的页面展示,那就会比较好了。
> 所以,对于到底是哪块内存超了,目前也还不是很清楚。
>
>
>看看,有没有大佬能帮忙看一下,这个问题如何优化会比较好?
>看了2020
> flink峰会,唐云大佬对于rocksdb做的相关工作的分享,提到先提高overhead内存可以先解决这个问题,但是我目前还没有解决掉。以及提到用户的使用不当,也会导致这个问题,不知我的场景,是否是不合理的。
>
>
>   盼复~~~
>
>
>
>
>


Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-10-02 文章 Eleanore Jin
Thanks a lot for the confirmation.

Eleanore

On Fri, Oct 2, 2020 at 2:42 AM Chesnay Schepler  wrote:

> Yes, the patch call only triggers the cancellation.
> You can check whether it is complete by polling the job status via
> jobs/ and checking whether state is CANCELED.
>
> On 9/27/2020 7:02 PM, Eleanore Jin wrote:
>
> I have noticed this: if I have Thread.sleep(1500); after the patch call
> returned 202, then the directory gets cleaned up, in the meanwhile, it
> shows the job-manager pod is in completed state before getting terminated:
> see screenshot: https://ibb.co/3F8HsvG
>
> So the patch call is async to terminate the job? Is there a way to check
> if cancel is completed? So that the stop tm and jm can be called afterwards?
>
> Thanks a lot!
> Eleanore
>
>
> On Sun, Sep 27, 2020 at 9:37 AM Eleanore Jin 
> wrote:
>
>> Hi Congxian,
>> I am making rest call to get the checkpoint config: curl -X GET \
>>
>> http://localhost:8081/jobs/d2c91a44f23efa2b6a0a89b9f1ca5a3d/checkpoints/config
>>
>> and here is the response:
>> {
>> "mode": "at_least_once",
>> "interval": 3000,
>> "timeout": 1,
>> "min_pause": 1000,
>> "max_concurrent": 1,
>> "externalization": {
>> "enabled": false,
>> "delete_on_cancellation": true
>> },
>> "state_backend": "FsStateBackend"
>> }
>>
>> I uploaded a screenshot of how azure blob storage looks like after the
>> cancel call : https://ibb.co/vY64pMZ
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Sat, Sep 26, 2020 at 11:23 PM Congxian Qiu 
>> wrote:
>>
>>> Hi Eleanore
>>>
>>> What the `CheckpointRetentionPolicy`[1] did you set for your job? if
>>> `ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set, then the
>>> checkpoint will be kept when canceling a job.
>>>
>>> PS the image did not show
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>> Best,
>>> Congxian
>>>
>>>
>>> Eleanore Jin  于2020年9月27日周日 下午1:50写道:
>>>
>>>> Hi experts,
>>>>
>>>> I am running flink 1.10.2 on kubernetes as per job cluster. Checkpoint
>>>> is enabled, with interval 3s, minimumPause 1s, timeout 10s. I'm
>>>> using FsStateBackend, snapshots are persisted to azure blob storage
>>>> (Microsoft cloud storage service).
>>>>
>>>> Checkpointed state is just source kafka topic offsets, the flink job is
>>>> stateless as it does filter/json transformation.
>>>>
>>>> The way I am trying to stop the flink job is via monitoring rest api
>>>> mentioned in doc
>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-1>
>>>>
>>>> e.g.
>>>> curl -X PATCH \
>>>>   '
>>>> http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
>>>> \
>>>>   -H 'Content-Type: application/json' \
>>>>   -d '{}'
>>>>
>>>> This call returned successfully with statusCode 202, then I stopped the
>>>> task manager pods and job manager pod.
>>>>
>>>> According to the doc, the checkpoint should be cleaned up after the job
>>>> is stopped/cancelled.
>>>> What I have observed is, the checkpoint dir is not cleaned up, can you
>>>> please shield some lights on what I did wrong?
>>>>
>>>> Below shows the checkpoint dir for a cancelled flink job.
>>>> [image: image.png]
>>>>
>>>> Thanks!
>>>> Eleanore
>>>>
>>>>
>


Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-09-27 文章 Eleanore Jin
I have noticed this: if I have Thread.sleep(1500); after the patch call
returned 202, then the directory gets cleaned up, in the meanwhile, it
shows the job-manager pod is in completed state before getting terminated:
see screenshot: https://ibb.co/3F8HsvG

So the patch call is async to terminate the job? Is there a way to check if
cancel is completed? So that the stop tm and jm can be called afterwards?

Thanks a lot!
Eleanore


On Sun, Sep 27, 2020 at 9:37 AM Eleanore Jin  wrote:

> Hi Congxian,
> I am making rest call to get the checkpoint config: curl -X GET \
>
> http://localhost:8081/jobs/d2c91a44f23efa2b6a0a89b9f1ca5a3d/checkpoints/config
>
> and here is the response:
> {
> "mode": "at_least_once",
> "interval": 3000,
> "timeout": 1,
> "min_pause": 1000,
> "max_concurrent": 1,
> "externalization": {
> "enabled": false,
> "delete_on_cancellation": true
> },
> "state_backend": "FsStateBackend"
> }
>
> I uploaded a screenshot of how azure blob storage looks like after the
> cancel call : https://ibb.co/vY64pMZ
>
> Thanks a lot!
> Eleanore
>
> On Sat, Sep 26, 2020 at 11:23 PM Congxian Qiu 
> wrote:
>
>> Hi Eleanore
>>
>> What the `CheckpointRetentionPolicy`[1] did you set for your job? if
>> `ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set, then the
>> checkpoint will be kept when canceling a job.
>>
>> PS the image did not show
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>> Best,
>> Congxian
>>
>>
>> Eleanore Jin  于2020年9月27日周日 下午1:50写道:
>>
>>> Hi experts,
>>>
>>> I am running flink 1.10.2 on kubernetes as per job cluster. Checkpoint
>>> is enabled, with interval 3s, minimumPause 1s, timeout 10s. I'm
>>> using FsStateBackend, snapshots are persisted to azure blob storage
>>> (Microsoft cloud storage service).
>>>
>>> Checkpointed state is just source kafka topic offsets, the flink job is
>>> stateless as it does filter/json transformation.
>>>
>>> The way I am trying to stop the flink job is via monitoring rest api
>>> mentioned in doc
>>> <https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-1>
>>>
>>> e.g.
>>> curl -X PATCH \
>>>   '
>>> http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
>>> \
>>>   -H 'Content-Type: application/json' \
>>>   -d '{}'
>>>
>>> This call returned successfully with statusCode 202, then I stopped the
>>> task manager pods and job manager pod.
>>>
>>> According to the doc, the checkpoint should be cleaned up after the job
>>> is stopped/cancelled.
>>> What I have observed is, the checkpoint dir is not cleaned up, can you
>>> please shield some lights on what I did wrong?
>>>
>>> Below shows the checkpoint dir for a cancelled flink job.
>>> [image: image.png]
>>>
>>> Thanks!
>>> Eleanore
>>>
>>>


Re: Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-09-27 文章 Eleanore Jin
Hi Congxian,
I am making rest call to get the checkpoint config: curl -X GET \

http://localhost:8081/jobs/d2c91a44f23efa2b6a0a89b9f1ca5a3d/checkpoints/config

and here is the response:
{
"mode": "at_least_once",
"interval": 3000,
"timeout": 1,
"min_pause": 1000,
"max_concurrent": 1,
"externalization": {
"enabled": false,
"delete_on_cancellation": true
},
"state_backend": "FsStateBackend"
}

I uploaded a screenshot of how azure blob storage looks like after the
cancel call : https://ibb.co/vY64pMZ

Thanks a lot!
Eleanore

On Sat, Sep 26, 2020 at 11:23 PM Congxian Qiu 
wrote:

> Hi Eleanore
>
> What the `CheckpointRetentionPolicy`[1] did you set for your job? if
> `ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION` is set, then the
> checkpoint will be kept when canceling a job.
>
> PS the image did not show
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
> Best,
> Congxian
>
>
> Eleanore Jin  于2020年9月27日周日 下午1:50写道:
>
>> Hi experts,
>>
>> I am running flink 1.10.2 on kubernetes as per job cluster. Checkpoint is
>> enabled, with interval 3s, minimumPause 1s, timeout 10s. I'm
>> using FsStateBackend, snapshots are persisted to azure blob storage
>> (Microsoft cloud storage service).
>>
>> Checkpointed state is just source kafka topic offsets, the flink job is
>> stateless as it does filter/json transformation.
>>
>> The way I am trying to stop the flink job is via monitoring rest api
>> mentioned in doc
>> <https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-1>
>>
>> e.g.
>> curl -X PATCH \
>>   '
>> http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
>> \
>>   -H 'Content-Type: application/json' \
>>   -d '{}'
>>
>> This call returned successfully with statusCode 202, then I stopped the
>> task manager pods and job manager pod.
>>
>> According to the doc, the checkpoint should be cleaned up after the job
>> is stopped/cancelled.
>> What I have observed is, the checkpoint dir is not cleaned up, can you
>> please shield some lights on what I did wrong?
>>
>> Below shows the checkpoint dir for a cancelled flink job.
>> [image: image.png]
>>
>> Thanks!
>> Eleanore
>>
>>


Checkpoint dir is not cleaned up after cancel the job with monitoring API

2020-09-26 文章 Eleanore Jin
Hi experts,

I am running flink 1.10.2 on kubernetes as per job cluster. Checkpoint is
enabled, with interval 3s, minimumPause 1s, timeout 10s. I'm
using FsStateBackend, snapshots are persisted to azure blob storage
(Microsoft cloud storage service).

Checkpointed state is just source kafka topic offsets, the flink job is
stateless as it does filter/json transformation.

The way I am trying to stop the flink job is via monitoring rest api
mentioned in doc


e.g.
curl -X PATCH \
  'http://localhost:8081/jobs/3c00535c182a3a00258e2f57bc11fb1a?mode=cancel'
\
  -H 'Content-Type: application/json' \
  -d '{}'

This call returned successfully with statusCode 202, then I stopped the
task manager pods and job manager pod.

According to the doc, the checkpoint should be cleaned up after the job is
stopped/cancelled.
What I have observed is, the checkpoint dir is not cleaned up, can you
please shield some lights on what I did wrong?

Below shows the checkpoint dir for a cancelled flink job.
[image: image.png]

Thanks!
Eleanore


Re: 关于sink失败 不消费kafka消息的处理

2020-08-28 文章 Eleanore Jin
Hi shizk233,

非常感谢你的解答,困扰我多时的问题终于明白了! 谢谢!


On Thu, Aug 27, 2020 at 10:28 PM shizk233 
wrote:

> Hi Eleanore,
>
> 我觉得是不一样的,差别就在于kafka auto commit发生在source算子消费了kafka event时(不会等待数据完成sink写入),
> 而chk机制提交offset发生在所有节点完成同一chk后。
>
>
> 虽然sink是stateless的,但这不妨碍它做chk。做chk的条件就是算子收到chk的barrier消息并且把barrier消息之前的数据处理完成。
> 所以chk机制提交offset时,可以保证之前的数据已经写入sink,是at least once的。
>
> Eleanore Jin  于2020年8月28日周五 上午1:17写道:
>
> > 感谢大家的回答,
> >
> > 我用的是APACHE BEAM, 然后RUNNER 用的是Flink, 这里是Beam 提供的KAFKA 的CONNECTOR,
> 如果看source
> > 的话,它是有state checkpointed: Beam KafkaIO KafkaUnboundedReader
> > <
> >
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L239
> > >
> > 但是看sink, 它没有任何state,是一个stateless的operator: Beam KafkaIO KafkaWriter
> > <
> >
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
> > >,
> > 所以这就是我想确认如果在sink 没有state 的前提下,那么是不是开启checkpoint, 只有source 记录 offset 和用
> > kafka auto commit offset 其实是一样的,既不能保证at least once,也不能 exactly once
> >
> > 谢谢!
> >
> > On Wed, Aug 26, 2020 at 7:31 PM 范超  wrote:
> >
> > > > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> > > > 这个时候source operator成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5,
> > 假设是6.
> > > > 假如这个时候publish message 4 失败了, 那么job restart from last successful
> > > > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
> > >
> > >
> > >
> >
> 按照我个人理解,应该是sink环节的部分失败,会使得sink环节的checkpoint失败,而jobmanager会因为这个sink环节的失败,而标记这个checkpoint的快照整体失败。
> > > 从而重启消费会从source的1开始重新消费
> > >
> > >
> > > -邮件原件-
> > > 发件人: Benchao Li [mailto:libenc...@apache.org]
> > > 发送时间: 2020年8月27日 星期四 10:06
> > > 收件人: user-zh 
> > > 主题: Re: 关于sink失败 不消费kafka消息的处理
> > >
> > > Hi Eleanore,shizk233 同学给出的解释已经很全面了。
> > >
> > > 对于你后面提的这个问题,我感觉这个理解应该不太正确。
> > > 开了checkpoint之后,虽然kafka producer没有用两阶段提交,但是也可以保证在checkpoint成功的时候
> > > 会将当前的所有数据flush出去。如果flush失败,那应该是会导致checkpoint失败的。所以我理解这里应该是
> > > at least once的语义,也就是数据可能会重复,但是不会丢。
> > >
> > > Eleanore Jin  于2020年8月27日周四 上午9:53写道:
> > >
> > > > Hi shizk233,
> > > >
> > > > 非常感谢你的回答! 如果是如下场景:我的DAG 就是从kafka source topic 读取数据, 然后写到kafka sink
> > > > topic,
> > > > 中间没有其他stateful operator. 如果sink operator 不是两端提交,就是kafka producer
> send,
> > > > 那么如果开启checkpoint, state 就只是source operator kafka offset.
> > > >
> > > > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> > > > 这个时候source operator 成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5,
> > > 假设是6.
> > > > 假如这个时候publish message 4 失败了, 那么job restart from last successful
> > > > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
> > > >
> > > > 谢谢!
> > > > Eleanore
> > > >
> > > > On Wed, Aug 26, 2020 at 9:32 AM shizk233 <
> wangwangdaxian...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Eleanore,这个问题我可以提供一点理解作为参考
> > > > >
> > > > > 1.chk与at least once
> > > > > checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
> > > > > 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。
> > > > >
> > > > > 2. sink2PC
> > > > > 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
> > > > > 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
> > > > > 在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。
> > > > >
> > > > > 这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。
> > > > >
> > > > > 3.kafka auto commit
> > > > > chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk
> > > > > n的时候才提交offset。
> > > > > kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk
> > > > > n的完成。
> > > > >
> > > > > Eleanore Jin  于2020年8月26日周三 下午11:51写道:
> > > > >
> > > > > > Hi Benchao
> > > > > > 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink
> > > > > > 都是kafka, 如果
> > > > > sink
> > > > > > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto
> > > > commit
> > > > > > offset 看起来似乎没有什么区别
> > > > > >
> > > > > > 可否具体解释一下? 谢谢!
> > > > > >
> > > > > > Eleanore
> > > > > >
> > > > > > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li  >
> > > > wrote:
> > > > > >
> > > > > > > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
> > > > > > >
> > > > > > > 范超  于2020年8月26日周三 上午11:38写道:
> > > > > > >
> > > > > > > > 大家好,我现在有个疑问
> > > > > > > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提
> > > > 交kafka的消费位移呢?
> > > > > > > >
> > > > > > > >
> > > > > > > > 多谢大家了
> > > > > > > >
> > > > > > > > 范超
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Best,
> > > > > > > Benchao Li
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>


Re: 关于sink失败 不消费kafka消息的处理

2020-08-27 文章 Eleanore Jin
感谢大家的回答,

我用的是APACHE BEAM, 然后RUNNER 用的是Flink, 这里是Beam 提供的KAFKA 的CONNECTOR, 如果看source
的话,它是有state checkpointed: Beam KafkaIO KafkaUnboundedReader
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L239>
但是看sink, 它没有任何state,是一个stateless的operator: Beam KafkaIO KafkaWriter
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java>,
所以这就是我想确认如果在sink 没有state 的前提下,那么是不是开启checkpoint, 只有source 记录 offset 和用
kafka auto commit offset 其实是一样的,既不能保证at least once,也不能 exactly once

谢谢!

On Wed, Aug 26, 2020 at 7:31 PM 范超  wrote:

> > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> > 这个时候source operator成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5, 假设是6.
> > 假如这个时候publish message 4 失败了, 那么job restart from last successful
> > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
>
>
> 按照我个人理解,应该是sink环节的部分失败,会使得sink环节的checkpoint失败,而jobmanager会因为这个sink环节的失败,而标记这个checkpoint的快照整体失败。
> 从而重启消费会从source的1开始重新消费
>
>
> -邮件原件-
> 发件人: Benchao Li [mailto:libenc...@apache.org]
> 发送时间: 2020年8月27日 星期四 10:06
> 收件人: user-zh 
> 主题: Re: 关于sink失败 不消费kafka消息的处理
>
> Hi Eleanore,shizk233 同学给出的解释已经很全面了。
>
> 对于你后面提的这个问题,我感觉这个理解应该不太正确。
> 开了checkpoint之后,虽然kafka producer没有用两阶段提交,但是也可以保证在checkpoint成功的时候
> 会将当前的所有数据flush出去。如果flush失败,那应该是会导致checkpoint失败的。所以我理解这里应该是
> at least once的语义,也就是数据可能会重复,但是不会丢。
>
> Eleanore Jin  于2020年8月27日周四 上午9:53写道:
>
> > Hi shizk233,
> >
> > 非常感谢你的回答! 如果是如下场景:我的DAG 就是从kafka source topic 读取数据, 然后写到kafka sink
> > topic,
> > 中间没有其他stateful operator. 如果sink operator 不是两端提交,就是kafka producer send,
> > 那么如果开启checkpoint, state 就只是source operator kafka offset.
> >
> > 假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
> > 这个时候source operator 成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5,
> 假设是6.
> > 假如这个时候publish message 4 失败了, 那么job restart from last successful
> > checkpoint, source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗
> >
> > 谢谢!
> > Eleanore
> >
> > On Wed, Aug 26, 2020 at 9:32 AM shizk233 
> > wrote:
> >
> > > Hi Eleanore,这个问题我可以提供一点理解作为参考
> > >
> > > 1.chk与at least once
> > > checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
> > > 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。
> > >
> > > 2. sink2PC
> > > 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
> > > 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
> > > 在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。
> > >
> > > 这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。
> > >
> > > 3.kafka auto commit
> > > chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk
> > > n的时候才提交offset。
> > > kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk
> > > n的完成。
> > >
> > > Eleanore Jin  于2020年8月26日周三 下午11:51写道:
> > >
> > > > Hi Benchao
> > > > 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink
> > > > 都是kafka, 如果
> > > sink
> > > > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto
> > commit
> > > > offset 看起来似乎没有什么区别
> > > >
> > > > 可否具体解释一下? 谢谢!
> > > >
> > > > Eleanore
> > > >
> > > > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li 
> > wrote:
> > > >
> > > > > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
> > > > >
> > > > > 范超  于2020年8月26日周三 上午11:38写道:
> > > > >
> > > > > > 大家好,我现在有个疑问
> > > > > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提
> > 交kafka的消费位移呢?
> > > > > >
> > > > > >
> > > > > > 多谢大家了
> > > > > >
> > > > > > 范超
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: 关于sink失败 不消费kafka消息的处理

2020-08-26 文章 Eleanore Jin
Hi shizk233,

非常感谢你的回答! 如果是如下场景:我的DAG 就是从kafka source topic 读取数据, 然后写到kafka sink topic,
中间没有其他stateful operator. 如果sink operator 不是两端提交,就是kafka producer send,
那么如果开启checkpoint, state 就只是source operator kafka offset.

假设,现在message 1-5 正在被sink operator publish, 1-3 已经publish了,但4,5 还没有,
这个时候source operator 成功完成了checkpoint, 这个checkpoint 里面 offset 应该要 > 5, 假设是6.
假如这个时候publish message 4 失败了, 那么job restart from last successful checkpoint,
source operator 就会从6 开始读数据,那么4 和 5 就会丢失了, 这个理解正确吗

谢谢!
Eleanore

On Wed, Aug 26, 2020 at 9:32 AM shizk233 
wrote:

> Hi Eleanore,这个问题我可以提供一点理解作为参考
>
> 1.chk与at least once
> checkpoint机制的思想就是做全局一致性的快照,失败恢复时数据的消费位点会回滚到上一次chk n的进度,
> 然后进行数据重放,这样就保证了数据不会缺失,至少被消费一次。
>
> 2. sink2PC
> 在chk机制下,数据重放时一般sink端的数据不能回滚,就会有重复数据。如果是upsert sink那仍然是一致的,
> 否则需要通过2PC的预提交来将chk n+1成功前的数据写到临时存储,等chk n+1完成再真正写入的物理存储。如果
> 在chk n+1之前任务失败回滚了,那临时存储的数据也可以回滚,这样就能保证一致性。
>
> 这样的话 chk就是at least once,chk+upsert或者chk+2pc就是exactly once了。
>
> 3.kafka auto commit
> chk快照的state不仅仅是source的offset,还有数据流中各个算子的state,chk机制会在整个数据流完成同一个chk
> n的时候才提交offset。
> kafka auto commit不能保障这种全局的一致性,因为auto commit是自动的,不会等待整个数据流上同一chk n的完成。
>
> Eleanore Jin  于2020年8月26日周三 下午11:51写道:
>
> > Hi Benchao
> > 可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink 都是kafka, 如果
> sink
> > 不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto commit
> > offset 看起来似乎没有什么区别
> >
> > 可否具体解释一下? 谢谢!
> >
> > Eleanore
> >
> > On Tue, Aug 25, 2020 at 9:59 PM Benchao Li  wrote:
> >
> > > 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
> > >
> > > 范超  于2020年8月26日周三 上午11:38写道:
> > >
> > > > 大家好,我现在有个疑问
> > > > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> > > >
> > > >
> > >
> >
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
> > > >
> > > >
> > > > 多谢大家了
> > > >
> > > > 范超
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>


Re: 关于sink失败 不消费kafka消息的处理

2020-08-26 文章 Eleanore Jin
Hi Benchao
可以解释一下为什么sink没有两阶段提交,那就是at least once 的语义吗? 比如source和 sink 都是kafka, 如果 sink
不是两段式提交,那么checkpoint 的state 就只是source 的 offset,这种情况下和使用kafka auto commit
offset 看起来似乎没有什么区别

可否具体解释一下? 谢谢!

Eleanore

On Tue, Aug 25, 2020 at 9:59 PM Benchao Li  wrote:

> 这种情况需要打开checkpoint来保证数据的不丢。如果sink没有两阶段提交,那就是at least once语义。
>
> 范超  于2020年8月26日周三 上午11:38写道:
>
> > 大家好,我现在有个疑问
> > 目前我使用kafka作为source,经过计算以后,将结果sink到数据库;
> >
> >
> 后来日志数据库发生了timeout或者宕机,kafka这边的主题,却消费掉了造成了数据丢失,那么如何设置才可以确认在sink失败的时候,不提交kafka的消费位移呢?
> >
> >
> > 多谢大家了
> >
> > 范超
> >
>
>
> --
>
> Best,
> Benchao Li
>


改动source或sink operator后 无法从savepoint恢复作业

2020-08-10 文章 Eleanore Jin
请教各位

我用的是 Beam 2.23.0, with flink runner 1.8.2. 想要实验启动checkpoint 和 Beam KafkaIO
EOS(exactly once semantics) 以后,添加或删除source/sink operator
然后从savepoint恢复作业的情况。我是在电脑上run kafka 和 flink cluster (1 job manager, 1 task
manager)

下面是我尝试的不同场景:

1. 在SAVEPOINT 后,添加一个source topic
在savepoint之前: read from input1 and write to output
Take a savepoint
在savepoint之后: read from input1 and input2 and write to output
情况: output 中没有输出input2的数据

2. 在SAVEPOINT 后 去掉一个source topic
在savepoint之前: read from input1 and input2 and write to output
Take a savepoint
在savepoint之后: read from input1 and write to output
情况: 可以正常运行,output只会有input1的数据

3. 在SAVEPOINT 后,添加一个sink topic
在savepoint之前: read from input1 and write to output1
Take a savepoint
在savepoint之后: read from input1 and write to output1 and output2
情况: pipeline failed with exception
[image: image.png]

4. 在SAVEPOINT 后 去掉一个sink topic
在savepoint之前: read from input1 and write to output1 and output2
Take a savepoint
在savepoint之后: read from input1 and write to output1
情况: It requires to change the sinkGroupId, otherwise get exception
[image: image.png]

看起来是改动SOURCE或者SINK以后,基本上不太能从SAVEPOINT恢复作业。想请教这是Flink预期的结果吗,还是有可能因为Beam
KafkaIO Exactly Once 的实现方式造成的,亦或是我配置的问题?

谢谢!
Eleanore


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-06 文章 Eleanore Jin
Hi Yang,

Thanks a lot for the information!

Eleanore

On Thu, Aug 6, 2020 at 4:20 AM Yang Wang  wrote:

> Hi Eleanore,
>
> From my experience, collecting the Flink metrics to prometheus via metrics
> collector is a more ideal way. It is
> also easier to configure the alert.
> Maybe you could use "fullRestarts" or "numRestarts" to monitor the job
> restarting. More metrics could be find
> here[2].
>
> [1].
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#prometheus-orgapacheflinkmetricsprometheusprometheusreporter
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#availability
>
> Best,
> Yang
>
> Eleanore Jin  于2020年8月5日周三 下午11:52写道:
>
>> Hi Yang and Till,
>>
>> Thanks a lot for the help! I have the similar question as Till mentioned,
>> if we do not fail Flink pods when the restart strategy is exhausted, it
>> might be hard to monitor such failures. Today I get alerts if the k8s pods
>> are restarted or in crash loop, but if this will no longer be the case, how
>> can we deal with the monitoring? In production, I have hundreds of small
>> flink jobs running (2-8 TM pods) doing stateless processing, it is really
>> hard for us to expose ingress for each JM rest endpoint to periodically
>> query the job status for each flink job.
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Wed, Aug 5, 2020 at 4:56 AM Till Rohrmann 
>> wrote:
>>
>>> You are right Yang Wang.
>>>
>>> Thanks for creating this issue.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Aug 5, 2020 at 1:33 PM Yang Wang  wrote:
>>>
>>>> Actually, the application status shows in YARN web UI is not determined
>>>> by the jobmanager process exit code.
>>>> Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
>>>> control the final status of YARN application.
>>>> So although jobmanager exit with zero code, it still could show failed
>>>> status in YARN web UI.
>>>>
>>>> I have created a ticket to track this improvement[1].
>>>>
>>>> [1]. https://issues.apache.org/jira/browse/FLINK-18828
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>>
>>>> Till Rohrmann  于2020年8月5日周三 下午3:56写道:
>>>>
>>>>> Yes for the other deployments it is not a problem. A reason why people
>>>>> preferred non-zero exit codes in case of FAILED jobs is that this is 
>>>>> easier
>>>>> to monitor than having to take a look at the actual job result. Moreover,
>>>>> in the YARN web UI the application shows as failed if I am not mistaken.
>>>>> However, from a framework's perspective, a FAILED job does not mean that
>>>>> Flink has failed and, hence, the return code could still be 0 in my 
>>>>> opinion.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang 
>>>>> wrote:
>>>>>
>>>>>> Hi Eleanore,
>>>>>>
>>>>>> Yes, I suggest to use Job to replace Deployment. It could be used
>>>>>> to run jobmanager one time and finish after a successful/failed 
>>>>>> completion.
>>>>>>
>>>>>> However, using Job still could not solve your problem completely.
>>>>>> Just as Till said, When a job exhausts the restart strategy, the 
>>>>>> jobmanager
>>>>>> pod will terminate with non-zero exit code. It will cause the K8s
>>>>>> restarting it again. Even though we could set the resartPolicy and
>>>>>> backoffLimit,
>>>>>> this is not a clean and correct way to go. We should terminate the
>>>>>> jobmanager process with zero exit code in such situation.
>>>>>>
>>>>>> @Till Rohrmann  I just have one concern. Is it
>>>>>> a special case for K8s deployment? For standalone/Yarn/Mesos, it seems 
>>>>>> that
>>>>>> terminating with
>>>>>> non-zero exit code is harmless.
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Yang
>>>>>>
>>>>>> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>>>>>>
>>>>>>> Hi Yang & Till,
>>>>>>>

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-05 文章 Eleanore Jin
Hi Yang and Till,

Thanks a lot for the help! I have the similar question as Till mentioned,
if we do not fail Flink pods when the restart strategy is exhausted, it
might be hard to monitor such failures. Today I get alerts if the k8s pods
are restarted or in crash loop, but if this will no longer be the case, how
can we deal with the monitoring? In production, I have hundreds of small
flink jobs running (2-8 TM pods) doing stateless processing, it is really
hard for us to expose ingress for each JM rest endpoint to periodically
query the job status for each flink job.

Thanks a lot!
Eleanore

On Wed, Aug 5, 2020 at 4:56 AM Till Rohrmann  wrote:

> You are right Yang Wang.
>
> Thanks for creating this issue.
>
> Cheers,
> Till
>
> On Wed, Aug 5, 2020 at 1:33 PM Yang Wang  wrote:
>
>> Actually, the application status shows in YARN web UI is not determined
>> by the jobmanager process exit code.
>> Instead, we use "resourceManagerClient.unregisterApplicationMaster" to
>> control the final status of YARN application.
>> So although jobmanager exit with zero code, it still could show failed
>> status in YARN web UI.
>>
>> I have created a ticket to track this improvement[1].
>>
>> [1]. https://issues.apache.org/jira/browse/FLINK-18828
>>
>>
>> Best,
>> Yang
>>
>>
>> Till Rohrmann  于2020年8月5日周三 下午3:56写道:
>>
>>> Yes for the other deployments it is not a problem. A reason why people
>>> preferred non-zero exit codes in case of FAILED jobs is that this is easier
>>> to monitor than having to take a look at the actual job result. Moreover,
>>> in the YARN web UI the application shows as failed if I am not mistaken.
>>> However, from a framework's perspective, a FAILED job does not mean that
>>> Flink has failed and, hence, the return code could still be 0 in my opinion.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Aug 5, 2020 at 9:30 AM Yang Wang  wrote:
>>>
>>>> Hi Eleanore,
>>>>
>>>> Yes, I suggest to use Job to replace Deployment. It could be used
>>>> to run jobmanager one time and finish after a successful/failed completion.
>>>>
>>>> However, using Job still could not solve your problem completely. Just
>>>> as Till said, When a job exhausts the restart strategy, the jobmanager
>>>> pod will terminate with non-zero exit code. It will cause the K8s
>>>> restarting it again. Even though we could set the resartPolicy and
>>>> backoffLimit,
>>>> this is not a clean and correct way to go. We should terminate the
>>>> jobmanager process with zero exit code in such situation.
>>>>
>>>> @Till Rohrmann  I just have one concern. Is it a
>>>> special case for K8s deployment? For standalone/Yarn/Mesos, it seems that
>>>> terminating with
>>>> non-zero exit code is harmless.
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> Eleanore Jin  于2020年8月4日周二 下午11:54写道:
>>>>
>>>>> Hi Yang & Till,
>>>>>
>>>>> Thanks for your prompt reply!
>>>>>
>>>>> Yang, regarding your question, I am actually not using k8s job, as I
>>>>> put my app.jar and its dependencies under flink's lib directory. I have 1
>>>>> k8s deployment for job manager, and 1 k8s deployment for task manager, and
>>>>> 1 k8s service for job manager.
>>>>>
>>>>> As you mentioned above, if flink job is marked as failed, it will
>>>>> cause the job manager pod to be restarted. Which is not the ideal
>>>>> behavior.
>>>>>
>>>>> Do you suggest that I should change the deployment strategy from using
>>>>> k8s deployment to k8s job? In case the flink program exit with non-zero
>>>>> code (e.g. exhausted number of configured restart), pod can be marked as
>>>>> complete hence not restarting the job again?
>>>>>
>>>>> Thanks a lot!
>>>>> Eleanore
>>>>>
>>>>> On Tue, Aug 4, 2020 at 2:49 AM Yang Wang 
>>>>> wrote:
>>>>>
>>>>>> @Till Rohrmann  In native mode, when a Flink
>>>>>> application terminates with FAILED state, all the resources will be 
>>>>>> cleaned
>>>>>> up.
>>>>>>
>>>>>> However, in standalone mode, I agree with you that we need to rethink
>>>>>> the exit cod

Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-04 文章 Eleanore Jin
Hi Yang & Till,

Thanks for your prompt reply!

Yang, regarding your question, I am actually not using k8s job, as I put my
app.jar and its dependencies under flink's lib directory. I have 1 k8s
deployment for job manager, and 1 k8s deployment for task manager, and 1
k8s service for job manager.

As you mentioned above, if flink job is marked as failed, it will cause the
job manager pod to be restarted. Which is not the ideal behavior.

Do you suggest that I should change the deployment strategy from using k8s
deployment to k8s job? In case the flink program exit with non-zero code
(e.g. exhausted number of configured restart), pod can be marked as
complete hence not restarting the job again?

Thanks a lot!
Eleanore

On Tue, Aug 4, 2020 at 2:49 AM Yang Wang  wrote:

> @Till Rohrmann  In native mode, when a Flink
> application terminates with FAILED state, all the resources will be cleaned
> up.
>
> However, in standalone mode, I agree with you that we need to rethink the
> exit code of Flink. When a job exhausts the restart
> strategy, we should terminate the pod and do not restart again. After
> googling, it seems that we could not specify the restartPolicy
> based on exit code[1]. So maybe we need to return a zero exit code to
> avoid restarting by K8s.
>
> [1].
> https://stackoverflow.com/questions/48797297/is-it-possible-to-define-restartpolicy-based-on-container-exit-code
>
> Best,
> Yang
>
> Till Rohrmann  于2020年8月4日周二 下午3:48写道:
>
>> @Yang Wang  I believe that we should rethink the
>> exit codes of Flink. In general you want K8s to restart a failed Flink
>> process. Hence, an application which terminates in state FAILED should not
>> return a non-zero exit code because it is a valid termination state.
>>
>> Cheers,
>> Till
>>
>> On Tue, Aug 4, 2020 at 8:55 AM Yang Wang  wrote:
>>
>>> Hi Eleanore,
>>>
>>> I think you are using K8s resource "Job" to deploy the jobmanager.
>>> Please set .spec.template.spec.restartPolicy = "Never" and
>>> spec.backoffLimit = 0.
>>> Refer here[1] for more information.
>>>
>>> Then, when the jobmanager failed because of any reason, the K8s job will
>>> be marked failed. And K8s will not restart the job again.
>>>
>>> [1].
>>> https://kubernetes.io/docs/concepts/workloads/controllers/job/#job-termination-and-cleanup
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Eleanore Jin  于2020年8月4日周二 上午12:05写道:
>>>
>>>> Hi Till,
>>>>
>>>> Thanks for the reply!
>>>>
>>>> I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
>>>> Specifically, I build a custom docker image, which I copied the app jar
>>>> (not uber jar) and all its dependencies under /flink/lib.
>>>>
>>>> So my question is more like, in this case, if the job is marked as
>>>> FAILED, which causes k8s to restart the pod, this seems not help at all,
>>>> what are the suggestions for such scenario?
>>>>
>>>> Thanks a lot!
>>>> Eleanore
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes
>>>>
>>>> On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann 
>>>> wrote:
>>>>
>>>>> Hi Eleanore,
>>>>>
>>>>> how are you deploying Flink exactly? Are you using the application
>>>>> mode with native K8s support to deploy a cluster [1] or are you manually
>>>>> deploying a per-job mode [2]?
>>>>>
>>>>> I believe the problem might be that we terminate the Flink process
>>>>> with a non-zero exit code if the job reaches the ApplicationStatus.FAILED
>>>>> [3].
>>>>>
>>>>> cc Yang Wang have you observed a similar behavior when running Flink
>>>>> in per-job mode on K8s?
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
>>>>> [3]
>>>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32
>>>>>
>>>>> On Fri, Jul 31, 2020 at 6:26 PM Eleanore Jin 
>>>>> wrote:
>>>>>
>>>>>> Hi Experts,
>>>>>>
>>>>>> I have a flink cluster (per job mode) running on kubernetes. The job
>>>>>> is configured with restart strategy
>>>>>>
>>>>>> restart-strategy.fixed-delay.attempts: 
>>>>>> 3restart-strategy.fixed-delay.delay: 10 s
>>>>>>
>>>>>>
>>>>>> So after 3 times retry, the job will be marked as FAILED, hence the
>>>>>> pods are not running. However, kubernetes will then restart the job again
>>>>>> as the available replicas do not match the desired one.
>>>>>>
>>>>>> I wonder what are the suggestions for such a scenario? How should I
>>>>>> configure the flink job running on k8s?
>>>>>>
>>>>>> Thanks a lot!
>>>>>> Eleanore
>>>>>>
>>>>>


Re: Behavior for flink job running on K8S failed after restart strategy exhausted

2020-08-03 文章 Eleanore Jin
Hi Till,

Thanks for the reply!

I manually deploy as per-job mode [1] and I am using Flink 1.8.2.
Specifically, I build a custom docker image, which I copied the app jar
(not uber jar) and all its dependencies under /flink/lib.

So my question is more like, in this case, if the job is marked as FAILED,
which causes k8s to restart the pod, this seems not help at all, what are
the suggestions for such scenario?

Thanks a lot!
Eleanore

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html#flink-job-cluster-on-kubernetes

On Mon, Aug 3, 2020 at 2:13 AM Till Rohrmann  wrote:

> Hi Eleanore,
>
> how are you deploying Flink exactly? Are you using the application mode
> with native K8s support to deploy a cluster [1] or are you manually
> deploying a per-job mode [2]?
>
> I believe the problem might be that we terminate the Flink process with a
> non-zero exit code if the job reaches the ApplicationStatus.FAILED [3].
>
> cc Yang Wang have you observed a similar behavior when running Flink in
> per-job mode on K8s?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#flink-kubernetes-application
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#job-cluster-resource-definitions
> [3]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ApplicationStatus.java#L32
>
> On Fri, Jul 31, 2020 at 6:26 PM Eleanore Jin 
> wrote:
>
>> Hi Experts,
>>
>> I have a flink cluster (per job mode) running on kubernetes. The job is
>> configured with restart strategy
>>
>> restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 
>> 10 s
>>
>>
>> So after 3 times retry, the job will be marked as FAILED, hence the pods
>> are not running. However, kubernetes will then restart the job again as the
>> available replicas do not match the desired one.
>>
>> I wonder what are the suggestions for such a scenario? How should I
>> configure the flink job running on k8s?
>>
>> Thanks a lot!
>> Eleanore
>>
>


Behavior for flink job running on K8S failed after restart strategy exhausted

2020-07-31 文章 Eleanore Jin
Hi Experts,

I have a flink cluster (per job mode) running on kubernetes. The job is
configured with restart strategy

restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s


So after 3 times retry, the job will be marked as FAILED, hence the pods
are not running. However, kubernetes will then restart the job again as the
available replicas do not match the desired one.

I wonder what are the suggestions for such a scenario? How should I
configure the flink job running on k8s?

Thanks a lot!
Eleanore


Re: Flink Training - why cannot keyBy hour?

2020-07-02 文章 Eleanore Jin
Hi David,

Thanks a lot for the explanation!

Eleanore

On Thu, Jul 2, 2020 at 6:30 AM David Anderson  wrote:

> Eleanore,
>
> Yes, if you change the implementation in the way that is suggested by the
> slide, the tests will fail. But it's more interesting to observe the
> behavior in the console.
>
> The notes that go with that slide explain the situation in more detail.
> (Use alt-p or option-p to see the notes). But to recap here, there are two
> related effects:
>
> (1) Instead of producing a single result at the end of the window, this
> alternative implementation produces a result for every event. In other
> words, it produces a stream that eventually arrives at the same maximum
> value produced by the timeWindowAll.
>
> (2) With timeWindowAll, once the results for a given hour have been
> produced, Flink frees the state associated with the window for that hour.
> It knows, based on the watermarking, that no more events are expected, so
> the state is no longer needed and can be cleared. But with maxBy, the state
> for each key (each hour) is kept forever. This is why this is not a good
> approach: the keyspace is unbounded, and we can't intervene to clean up
> stale state.
>
> Regards,
> David
>
> On Wed, Jul 1, 2020 at 2:26 AM Eleanore Jin 
> wrote:
>
>> Hi experts,
>>
>> I am going through Ververica flink training, and when doing the lab with
>> window (https://training.ververica.com/exercises/windows), basically it
>> requires to compute within an hour which driver earns the most tip.
>>
>> The logic is to
>> 0. keyBy driverId
>> 1. create 1 hour window based on eventTime
>> 2. sum up all the tips for this driver within this 1 hour window
>> 3. create an 1 hour globalWindow for all drivers
>> 4. find the max tips
>>
>> sample code shown as below.
>>
>> SingleOutputStreamOperator> 
>> aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId)
>>  .window(TumblingEventTimeWindows.of(Time.hours(1)))
>>  .process(new SumTipsFunction());
>>
>> // Tuple3: reporting the timestamp for the end of the hour, the driverId, 
>> and the total of that driver's tips for that hour
>> SingleOutputStreamOperator> hourlyMax =
>>  
>> aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
>>  .maxBy(2);
>>
>>
>> The question is shown as 4th slide: why we cannot keyed by the hour?
>>
>> If I change the implementation to keyBy hour and run the HourlyTipsTest,
>>
>> the test of testMaxAcrossDrivers will fail:
>>
>> // (94668840,1,6.0) -> for timestamp window: 94668840, driverId: 1, 
>> earns most tip: 6.0
>>
>> Expected :[(94668840,1,6.0), (94669200,2,20.0)]
>> Actual   :[(94668840,1,6.0), (94669200,2,20.0), 
>> (94669200,2,20.0)]
>>
>>
>> [image: image.png]
>>
>> Thanks a lot!
>> Eleanore
>>
>>


Flink Training - why cannot keyBy hour?

2020-06-30 文章 Eleanore Jin
Hi experts,

I am going through Ververica flink training, and when doing the lab with
window (https://training.ververica.com/exercises/windows), basically it
requires to compute within an hour which driver earns the most tip.

The logic is to
0. keyBy driverId
1. create 1 hour window based on eventTime
2. sum up all the tips for this driver within this 1 hour window
3. create an 1 hour globalWindow for all drivers
4. find the max tips

sample code shown as below.

SingleOutputStreamOperator>
aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
 .process(new SumTipsFunction());

// Tuple3: reporting the timestamp for the end of the hour, the
driverId, and the total of that driver's tips for that hour
SingleOutputStreamOperator> hourlyMax =
 aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
 .maxBy(2);


The question is shown as 4th slide: why we cannot keyed by the hour?

If I change the implementation to keyBy hour and run the HourlyTipsTest,

the test of testMaxAcrossDrivers will fail:

// (94668840,1,6.0) -> for timestamp window: 94668840,
driverId: 1, earns most tip: 6.0

Expected :[(94668840,1,6.0), (94669200,2,20.0)]
Actual   :[(94668840,1,6.0), (94669200,2,20.0), (94669200,2,20.0)]


[image: image.png]

Thanks a lot!
Eleanore


Re: run flink on edge vs hub

2020-05-18 文章 Eleanore Jin
Hi Arvid,

Thanks for the suggestion! I will tryout to see how it works.

Best,
Eleanore

On Mon, May 18, 2020 at 8:04 AM Arvid Heise  wrote:

> Hi Eleanore,
>
> The question in general is what you understand under edge data centers as
> the term is pretty fuzzy. Since Flink is running on Java, it's not suitable
> for embedded clusters as of now. There is plenty of work done already to
> tests that Flink runs on ARM clusters [1].
>
> If you just mean in general moving away from a monolithic hub cluster to
> smaller clusters, then this is easily done with Flink on the compute side.
> The question is rather how data storage should look in such an edge setting
> and how the interfaces look.
>
> From your example, it seems as if you want to use Flink as a reactive
> server, possibly easily scalable. If so, then yes it is possible with
> Flink, even though I'd say it's not the primary use case for Flink. In any
> case, synchronous requests will be a bit difficult/unnatural. I'd probably
> go for an async job pattern. So Flink listens to some port for requests (
> socketTextStream [2]) with a job id, processes data and keeps the data in
> state keyed by job id. The client then uses the job id to fetch the job
> state through queryable state [2]. The responses eventually time out
> through TTL [4].
>
> Of course, you'd put a small proxy in front of that composited job
> (separate input/query port) that translates the queries from the client to
> the Flink job. The proxy would most likely also generate the job id and
> return it to the client. Ultimately, that proxy could offer a synchronous
> interface and pull for the result itself, but that makes the proxy suddenly
> quite heavy.
>
> The proxy setup can be reused for different edge clusters making it a one
> time investment. Note that there are other software stacks for reactive
> servers that offer the functionality out of the box.
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-ARM-support-for-Flink-td30298.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/datastream_api.html#data-sources
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
> [4]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>
> On Mon, May 18, 2020 at 4:39 AM Eleanore Jin 
> wrote:
>
>> Hi Community,
>>
>> Currently we are running flink in 'hub' data centers where data is
>> ingested into the platform via kafka, and flink job will read from kafka,
>> do the transformations, and publish to another kafka topic.
>>
>> I would also like to see if the same logic (read input message -> do
>> transformation -> return output message) can be applied on 'edge' data
>> centers.
>>
>> The requirement for run on 'edge' is to return the response
>> synchronously. Like the synchronous http based request/response.
>>
>> Can you please provide some guidance/thoughts on this?
>>
>> Thanks a lot!
>> Eleanore
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


run flink on edge vs hub

2020-05-17 文章 Eleanore Jin
Hi Community,

Currently we are running flink in 'hub' data centers where data is ingested
into the platform via kafka, and flink job will read from kafka, do the
transformations, and publish to another kafka topic.

I would also like to see if the same logic (read input message -> do
transformation -> return output message) can be applied on 'edge' data
centers.

The requirement for run on 'edge' is to return the response synchronously.
Like the synchronous http based request/response.

Can you please provide some guidance/thoughts on this?

Thanks a lot!
Eleanore


Re: Broadcast stream causing GC overhead limit exceeded

2020-05-06 文章 Eleanore Jin
Hi Fabian,

I just got confirmation from Apache Beam community, Beam will buffer the
data until there is data from broadcast stream.

Thanks!
Eleanore

On Tue, May 5, 2020 at 12:31 AM Fabian Hueske  wrote:

> Hi Eleanore,
>
> The "GC overhead limit exceeded" error shows that the JVM spends way too
> much time garbage collecting and only recovers little memory with every run.
> Since, the program doesn't make any progress in such a situation it is
> terminated with the GC Overhead Error. This typically happens when lots of
> temporary objects are created.
> The root cause could be Flink, Beam, or your own code.
> It's important to understand that this error is not directly related to a
> shortage of memory (although more memory can help to mitigate the issue a
> bit) but rather indicates an implementation issue.
>
> Coming back to your question, Flink's Broadcast stream does *not* block or
> collect events from the non-broadcasted side if the broadcast side doesn't
> serve events.
> However, the user-implemented operators (Beam or your code in this case)
> often puts non-broadcasted events into state to wait for input from the
> other side.
> Since the error is not about lack of memory, the buffering in Flink state
> might not be the problem here.
>
> Best, Fabian
>
>
>
>
>
> Am So., 3. Mai 2020 um 03:39 Uhr schrieb Eleanore Jin <
> eleanore@gmail.com>:
>
>> Hi All,
>>
>> I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
>> sideinput (which translates into Flink NonKeyedBroadcastStream) to do
>> filter of the data from main stream.
>>
>> I have experienced OOM: GC overhead limit exceeded continuously.
>>
>> After did some experiments, I observed following behaviour:
>> 1. run job without side input(broadcast stream): no OOM issue
>> 2. run job with side input (kafka topic with 1 partition) with data
>> available from this side input: no OOM issue
>> 3. run job with side input (kafka topic with 1 partition) without any
>> data from the side input: *OOM issue*
>> 4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
>> looks like due to the references hold by Broadcast stream
>> [image: image.png]
>>
>> My question is: what is the behaviour from Broadcast stream if there is
>> no data available? Does it cache the data from main stream and wait until
>> data becoming available from Broadcast stream to process?
>>
>> Thanks a lot!
>> Eleanore
>>
>


Export user metrics with Flink Prometheus endpoint

2020-05-05 文章 Eleanore Jin
Hi all,

I just wonder is it possible to use Flink Metrics endpoint to allow
Prometheus to scrape user defined metrics?

Context:
In addition to Flink metrics, we also collect some application level
metrics using opencensus. And we run opencensus agent as side car in
kubernetes pod to collect metrics (opencensus agent talks to task manager
container via rpcs)

The issue with this approach is: it looks like opencensus agent keeps
staled metrics, causing the metrics reporting inaccurate, and this project
is not actively maintained anymore.

So I wonder if it is possible to use Flink metrics endpoint for user
defined metrics.

Thanks a lot!
Eleanore


Broadcast stream causing GC overhead limit exceeded

2020-05-02 文章 Eleanore Jin
Hi All,

I am using apache Beam with Flink (1.8.2). In my job, I am using Beam
sideinput (which translates into Flink NonKeyedBroadcastStream) to do
filter of the data from main stream.

I have experienced OOM: GC overhead limit exceeded continuously.

After did some experiments, I observed following behaviour:
1. run job without side input(broadcast stream): no OOM issue
2. run job with side input (kafka topic with 1 partition) with data
available from this side input: no OOM issue
3. run job with side input (kafka topic with 1 partition) without any data
from the side input: *OOM issue*
4. From the heap dump, the message (of type ObjectNode) cannot be GC'd
looks like due to the references hold by Broadcast stream
[image: image.png]

My question is: what is the behaviour from Broadcast stream if there is no
data available? Does it cache the data from main stream and wait until data
becoming available from Broadcast stream to process?

Thanks a lot!
Eleanore


Flink Task Manager GC overhead limit exceeded

2020-04-29 文章 Eleanore Jin
Hi All,

Currently I am running a flink job cluster (v1.8.2) on kubernetes with 4
pods, each pod with 4 parallelism.

The flink job reads from a source topic with 96 partitions, and does per
element filter, the filtered value comes from a broadcast topic and it
always use the latest message as the filter criteria, then publish to a
sink topic.

There is no checkpointing and state involved.

Then I am seeing GC overhead limit exceeded error continuously and the pods
keep on restarting

So I tried to increase the heap size for task manager by

containers:

  - args:

- task-manager

- -Djobmanager.rpc.address=service-job-manager

- -Dtaskmanager.heap.size=4096m

- -Denv.java.opts.taskmanager="-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/dumps/oom.bin"


3 things I noticed,


1. I dont see the heap size from UI for task manager show correctly

[image: image.png]

2. I dont see the heap dump file in the restarted pod /dumps/oom.bin, did I
set the java opts wrong?

3. I continously seeing below logs from all pods, not sure if causes any
issue
{"@timestamp":"2020-04-29T23:39:43.387Z","@version":"1","message":"[Consumer
clientId=consumer-1, groupId=aba774bc] Node 6 was unable to process the
fetch request with (sessionId=2054451921, epoch=474):
FETCH_SESSION_ID_NOT_FOUND.","logger_name":"org.apache.kafka.clients.FetchSessionHandler","thread_name":"pool-6-thread-1","level":"INFO","level_value":2}

Thanks a lot for any help!

Best,
Eleanore


Re: how to send back result via job manager to client

2020-04-19 文章 Eleanore Jin
Hi Kurt,

谢谢, 我了解过后如果有问题再请教

Best
Eleanore

On Sun, Apr 19, 2020 at 7:18 PM Kurt Young  wrote:

> 可以看下这个jira:https://issues.apache.org/jira/browse/FLINK-14807
>
> Best,
> Kurt
>
>
> On Mon, Apr 20, 2020 at 7:07 AM Eleanore Jin 
> wrote:
>
> > Hi,
> > 刚刚读到一篇关于Flink 在OLAP 上的使用案例 (
> >
> https://ververica.cn/developers/olap-engine-performance-optimization-and-application-cases/
> ),
> > 其中一点提到了:
> > [image: image.png]
> > 这部分优化,源于 OLAP 的一个特性:OLAP 会将最终计算结果发给客户端,通过JobManager 转发给 Client。
> >
> > 想请问一下这一点是如何实现的: 通过JobManager 把结果转发给Client.
> >
> > 谢谢!
> > Eleanore
> >
>


how to send back result via job manager to client

2020-04-19 文章 Eleanore Jin
Hi,
刚刚读到一篇关于Flink 在OLAP 上的使用案例 (
https://ververica.cn/developers/olap-engine-performance-optimization-and-application-cases/),
其中一点提到了:
[image: image.png]
这部分优化,源于 OLAP 的一个特性:OLAP 会将最终计算结果发给客户端,通过JobManager 转发给 Client。

想请问一下这一点是如何实现的: 通过JobManager 把结果转发给Client.

谢谢!
Eleanore


Start flink job from the latest checkpoint programmatically

2020-03-12 文章 Eleanore Jin
Hi All,

The setup of my flink application is allow user to start and stop.

The Flink job is running in job cluster (application jar is available to
flink upon startup). When stop a running application, it means exit the
program.

When restart a stopped job, it means to spin up new job cluster with the
same application jar, but this essentially means a new flink job.

I just wonder is there a way to let the restarted job resume from the
latest checkpoint from previous stopped flink job? And is there a way to
set it up programmatically in the application?

Thanks a lot!
Eleanore


Re: scaling issue Running Flink on Kubernetes

2020-03-11 文章 Eleanore Jin
Hi Flavio,

We have implemented our own flink operator, the operator will start a flink
job cluster (the application jar is already packaged together with flink in
the docker image). I believe Google's flink operator will start a session
cluster, and user can submit the flink job via REST. Not looked into lyft
one before.

Eleanore


On Wed, Mar 11, 2020 at 2:21 AM Flavio Pompermaier 
wrote:

> Sorry I wanted to mention https://github.com/lyft/flinkk8soperator (I
> don't know which one of the 2 is better)
>
> On Wed, Mar 11, 2020 at 10:19 AM Flavio Pompermaier 
> wrote:
>
>> Have you tried to use existing operators such as
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator or
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator?
>>
>> On Wed, Mar 11, 2020 at 4:46 AM Xintong Song 
>> wrote:
>>
>>> Hi Eleanore,
>>>
>>> That does't sound like a scaling issue. It's probably a data skew, that
>>> the data volume on some of the keys are significantly higher than others.
>>> I'm not familiar with this area though, and have copied Jark for you, who
>>> is one of the community experts in this area.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Wed, Mar 11, 2020 at 10:37 AM Eleanore Jin 
>>> wrote:
>>>
>>>> _Hi Xintong,
>>>>
>>>> Thanks for the prompt reply! To answer your question:
>>>>
>>>>- Which Flink version are you using?
>>>>
>>>>v1.8.2
>>>>
>>>>- Is this skew observed only after a scaling-up? What happens if
>>>>the parallelism is initially set to the scaled-up value?
>>>>
>>>>I also tried this, it seems skew also happens even I do
>>>> not change the parallelism, so it may not caused by scale-up/down
>>>>
>>>>- Keeping the job running a while after the scale-up, does the skew
>>>>ease?
>>>>
>>>>So the skew happens in such a way that: some partitions
>>>> lags down to 0, but other partitions are still at level of 10_000, and I am
>>>> seeing the back pressure is ok.
>>>>
>>>> Thanks a lot!
>>>> Eleanore
>>>>
>>>>
>>>> On Tue, Mar 10, 2020 at 7:03 PM Xintong Song 
>>>> wrote:
>>>>
>>>>> Hi Eleanore,
>>>>>
>>>>> I have a few more questions regarding your issue.
>>>>>
>>>>>- Which Flink version are you using?
>>>>>- Is this skew observed only after a scaling-up? What happens if
>>>>>the parallelism is initially set to the scaled-up value?
>>>>>- Keeping the job running a while after the scale-up, does the
>>>>>skew ease?
>>>>>
>>>>> I suspect the performance difference might be an outcome of some
>>>>> warming up issues. E.g., the existing TMs might have some file already
>>>>> localized, or some memory buffers already promoted to the JVM tenured 
>>>>> area,
>>>>> while the new TMs have not.
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin 
>>>>> wrote:
>>>>>
>>>>>> Hi Experts,
>>>>>> I have my flink application running on Kubernetes, initially with 1
>>>>>> Job Manager, and 2 Task Managers.
>>>>>>
>>>>>> Then we have the custom operator that watches for the CRD, when the
>>>>>> CRD replicas changed, it will patch the Flink Job Manager deployment
>>>>>> parallelism and max parallelism according to the replicas from CRD
>>>>>> (parallelism can be configured via env variables for our application).
>>>>>> which causes the job manager restart. hence a new Flink job. But the
>>>>>> consumer group does not change, so it will continue from the offset
>>>>>> where it left.
>>>>>>
>>>>>> In addition, operator will also update Task Manager's deployment
>>>>>> replicas, and will adjust the pod number.
>>>>>>
>>>>>> In case of scale up, the existing task manager pods do not get
>>>>>> killed, but new task manager pods will be created.
>>>>>>
>>>>>> And we observed a skew in the partition offset consumed. e.g. some
>>>>>> partitions have huge lags and other partitions have small lags. (observed
>>>>>> from burrow)
>>>>>>
>>>>>> This is also validated by the metrics from Flink UI, showing the
>>>>>> throughput differs for slotss
>>>>>>
>>>>>> Any clue why this is the case?
>>>>>>
>>>>>> Thanks a lot!
>>>>>> Eleanore
>>>>>>
>>>>>
>>
>


Re: scaling issue Running Flink on Kubernetes

2020-03-10 文章 Eleanore Jin
_Hi Xintong,

Thanks for the prompt reply! To answer your question:

   - Which Flink version are you using?

   v1.8.2

   - Is this skew observed only after a scaling-up? What happens if the
   parallelism is initially set to the scaled-up value?

   I also tried this, it seems skew also happens even I do not
change the parallelism, so it may not caused by scale-up/down

   - Keeping the job running a while after the scale-up, does the skew ease?

   So the skew happens in such a way that: some partitions lags
down to 0, but other partitions are still at level of 10_000, and I am
seeing the back pressure is ok.

Thanks a lot!
Eleanore


On Tue, Mar 10, 2020 at 7:03 PM Xintong Song  wrote:

> Hi Eleanore,
>
> I have a few more questions regarding your issue.
>
>- Which Flink version are you using?
>- Is this skew observed only after a scaling-up? What happens if the
>parallelism is initially set to the scaled-up value?
>- Keeping the job running a while after the scale-up, does the skew
>ease?
>
> I suspect the performance difference might be an outcome of some warming
> up issues. E.g., the existing TMs might have some file already localized,
> or some memory buffers already promoted to the JVM tenured area, while the
> new TMs have not.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Mar 11, 2020 at 9:25 AM Eleanore Jin 
> wrote:
>
>> Hi Experts,
>> I have my flink application running on Kubernetes, initially with 1 Job
>> Manager, and 2 Task Managers.
>>
>> Then we have the custom operator that watches for the CRD, when the CRD
>> replicas changed, it will patch the Flink Job Manager deployment
>> parallelism and max parallelism according to the replicas from CRD
>> (parallelism can be configured via env variables for our application).
>> which causes the job manager restart. hence a new Flink job. But the
>> consumer group does not change, so it will continue from the offset
>> where it left.
>>
>> In addition, operator will also update Task Manager's deployment
>> replicas, and will adjust the pod number.
>>
>> In case of scale up, the existing task manager pods do not get killed,
>> but new task manager pods will be created.
>>
>> And we observed a skew in the partition offset consumed. e.g. some
>> partitions have huge lags and other partitions have small lags. (observed
>> from burrow)
>>
>> This is also validated by the metrics from Flink UI, showing the
>> throughput differs for slotss
>>
>> Any clue why this is the case?
>>
>> Thanks a lot!
>> Eleanore
>>
>


scaling issue Running Flink on Kubernetes

2020-03-10 文章 Eleanore Jin
Hi Experts,
I have my flink application running on Kubernetes, initially with 1 Job
Manager, and 2 Task Managers.

Then we have the custom operator that watches for the CRD, when the CRD
replicas changed, it will patch the Flink Job Manager deployment
parallelism and max parallelism according to the replicas from CRD
(parallelism can be configured via env variables for our application).
which causes the job manager restart. hence a new Flink job. But the
consumer group does not change, so it will continue from the offset
where it left.

In addition, operator will also update Task Manager's deployment replicas,
and will adjust the pod number.

In case of scale up, the existing task manager pods do not get killed, but
new task manager pods will be created.

And we observed a skew in the partition offset consumed. e.g. some
partitions have huge lags and other partitions have small lags. (observed
from burrow)

This is also validated by the metrics from Flink UI, showing the throughput
differs for slotss

Any clue why this is the case?

Thanks a lot!
Eleanore


Re: Is incremental checkpoints needed?

2020-03-10 文章 Eleanore Jin
Hi Arvid,

Thank you for the clarification!

Best,
Eleanore


On Tue, Mar 10, 2020 at 12:32 PM Arvid Heise  wrote:

> Hi Eleanore,
>
> incremental checkpointing would be needed if you have a large state
> (GB-TB), but between two checkpoints only little changes happen (KB-MB).
>
> There are two reasons for large state: large user state or large operator
> state coming from joins, windows, or grouping. In the end, you will see the
> total size in the web ui. If it's small and checkpointing duration is low,
> there is absolutely no way to go incremental.
>
> On Tue, Mar 10, 2020 at 5:43 PM Eleanore Jin 
> wrote:
>
>> Hi All,
>>
>> I am using Apache Beam to construct the pipeline, and this pipeline is
>> running with Flink Runner.
>>
>> Both Source and Sink are Kafka topics, I have enabled Beam Exactly once
>> semantics.
>>
>> I believe how it works in beam is:
>> the messages will be cached and not processed by the
>> KafkaExactlyOnceSink, until the checkpoint completes and all the cached
>> messages are checkpointed, then it will start processing those messages.
>>
>> So is there any benefit to enable increment checkpointing when using
>> RocksDB as backend. Because I see the states as consumer offsets, and
>> cached messages in between checkpoints. Delta seems to be the complete new
>> checkpointed states.
>>
>> Thanks a lot!
>> Eleanore
>>
>


Is incremental checkpoints needed?

2020-03-10 文章 Eleanore Jin
Hi All,

I am using Apache Beam to construct the pipeline, and this pipeline is
running with Flink Runner.

Both Source and Sink are Kafka topics, I have enabled Beam Exactly once
semantics.

I believe how it works in beam is:
the messages will be cached and not processed by the KafkaExactlyOnceSink,
until the checkpoint completes and all the cached messages are
checkpointed, then it will start processing those messages.

So is there any benefit to enable increment checkpointing when using
RocksDB as backend. Because I see the states as consumer offsets, and
cached messages in between checkpoints. Delta seems to be the complete new
checkpointed states.

Thanks a lot!
Eleanore


How to test flink job recover from checkpoint

2020-03-04 文章 Eleanore Jin
Hi,

I have a flink application and checkpoint is enabled, I am running locally
using miniCluster.

I just wonder if there is a way to simulate the failure, and verify that
flink job restarts from checkpoint?

Thanks a lot!
Eleanore