Re: Zookeeper HA with Kubernetes: Possible to use the same Zookeeper cluster w/multiple Flink Operators?

2023-09-20 Thread Gyula Fóra
Hi!

The cluster-id for each FlinkDeployment is simply the name of the
deployment. So they are all different in a given namespace. (In other words
they are not fixed as your question suggests but set automatically)

So there should be no problem sharing the ZK cluster .

Cheers
Gyula

On Thu, 21 Sep 2023 at 03:46, Brian King  wrote:

> Hello Flink Users!
>
> We're attempting to deploy a Flink application cluster on Kubernetes,
> using the Flink Operator and Zookeeper for HA.
>
> We're using Flink 1.16 and I have a question about some of the Zookeeper
> configuration[0]:
>
> "high-availability.zookeeper.path.root" is described as "The *root
> ZooKeeper node*, under which all cluster nodes are placed"
>
> "high-availability.cluster-id" , which says "important: customize per
> cluster." But it also says "you should not set this value manually when
> running on [...] native Kubernetes [...]in those cases a cluster-id is
> [...] automatically generated."
>
> Our design calls for multiple Flink application clusters managed by the
> same Flink Operator, and using the same Zookeeper quorum for each Flink
> Application cluster. Will the Flink Operator be able to handle this, or
> will the different clusters collide due to the fixed
> "high-availability.cluster-id" value? Is it possible to avoid this by
> setting a unique "high-availability.zookeeper.path.root" for each
> application cluster?
>
> Thanks for your time. I'm new to Flink, so I apologize if I did not
> explain myself clearly. Please let me know if you need additional
> information.
>
> Best,
>
> Brian King
> SRE, Data Platform/Search Platform
> Wikimedia Foundation
> IRC: inflatador
>
> [0]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/ha/zookeeper_ha/#configuration
>
>
>


Flink Gzip Sink with Error

2023-09-20 Thread Yunhui Han
Hi all,

I want  to write JSON strings with gzip compression by Flink following the
demo

on StackOverflow.

I encountered a problem.

There is an ill format string at the beginning of the gzip file.

[image: image.png]

And the splitter between two JSON strings is also ill formatted.

[image: image.png]
[image: image.png]

Anyone can help me with this?

Best, YunHui


RE: Re: Re: How to read flinkSQL job state

2023-09-20 Thread Yifan He via user
Hi Hangxiang,

I still have one question about this problem, when using datastream api I
know the key and value type I use in state because I
defined ValueStateDescriptor, but how can I get the ValueStateDescriptor in
flinksql?

Thanks,
Yifan

On 2023/09/07 06:16:41 Hangxiang Yu wrote:
> Hi, Yifan.
> Which flink version are you using ?
> You are using filesystem instead of rocksdb so that your checkpoint size
> may not be incremental IIUC.
>
> On Thu, Sep 7, 2023 at 10:52 AM Yifan He via user 
> wrote:
>
> > Hi Shammon,
> >
> > We are using RocksDB,and the configuration is below:
> > execution.checkpointing.externalized-checkpoint-retention:
> > RETAIN_ON_CANCELLATION
> > execution.checkpointing.max-concurrent-checkpoints: 1
> > execution.checkpointing.min-pause: 0
> > execution.checkpointing.mode: EXACTLY_ONCE
> > execution.checkpointing.snapshot-compression: true
> > execution.checkpointing.timeout: 6
> > state.backend: FILESYSTEM
> > state.backend.incremental: true
> > state.backend.local-recovery: true
> > state.backend.rocksdb.memory.high-prio-pool-ratio: 0.1
> > state.backend.rocksdb.memory.managed: true
> > state.backend.rocksdb.memory.write-buffer-ratio: 0.5
> > state.backend.rocksdb.predefined-options: DEFAULT
> > state.backend.rocksdb.timer-service.factory: ROCKSDB
> > state.checkpoints.num-retained: 3
> >
> > Thanks,
> > Yifan
> >
> > On 2023/09/06 08:00:31 Shammon FY wrote:
> > > Hi Yifan,
> > >
> > > Besides reading job state, I would like to know what statebackend are
you
> > > using? Can you give the configurations about state and checkpoint for
> > your
> > > job? Maybe you can check these configuration items to confirm if they
are
> > > correct first.
> > >
> > > Best,
> > > Shammon FY
> > >
> > > On Wed, Sep 6, 2023 at 3:17 PM Hang Ruan  wrote:
> > >
> > > > Hi, Yifan.
> > > >
> > > > I think the document[1] means to let us convert the DataStream to
the
> > > > Table[2]. Then we could handle the state with the Table API & SQL.
> > > >
> > > > Best,
> > > > Hang
> > > >
> > > > [1]
> > > >
> >
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/
> > > > [2]
> > > >
> >
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table
> > > >
> > > > Yifan He via user  于2023年9月6日周三 13:10写道:
> > > >
> > > >> Hi team,
> > > >>
> > > >> We are investigating why the checkpoint size of our FlinkSQL jobs
> > keeps
> > > >> growing and we want to look into the checkpoint file to know what
is
> > > >> causing the problem. I know we can use the state processor api to
> > read the
> > > >> state of jobs using datastream api, but how can I read the state of
> > jobs
> > > >> using table api & sql?
> > > >>
> > > >> Thanks,
> > > >> Yifan
> > > >>
> > > >
> > >
> >
>
>
> --
> Best,
> Hangxiang.
>


RE: About Flink parquet format

2023-09-20 Thread Kamal Mittal via user
Yes.

Due to below error, Flink bulk writer never close the part file and keep on 
creating new part file continuously. Is flink not handling exceptions like 
below?

From: Feng Jin 
Sent: 20 September 2023 05:54 PM
To: Kamal Mittal 
Cc: user@flink.apache.org
Subject: Re: About Flink parquet format

Hi

I tested it on my side and also got the same error. This should be a limitation 
of Parquet.

```
java.lang.IllegalArgumentException: maxCapacityHint can't be less than 
initialSlabSize 64 1
at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:57) 
~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:153)
 ~[flink-sql-parquet-1.17.1.jar:1.17.1]
at 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder.(RunLengthBitPackingHybridEncoder.jav
```


So I think the current minimum page size that can be set for parquet is 64B.

Best,
Feng


On Tue, Sep 19, 2023 at 6:06 PM Kamal Mittal 
mailto:kamal.mit...@ericsson.com>> wrote:
Hello,

If given page size as 1 byte then encountered exception as  - ‘maxCapacityHint 
can't be less than initialSlabSize %d %d’.

This is coming from class CapacityByteArrayOutputStream and contained in 
parquet-common library.

Rgds,
Kamal

From: Feng Jin mailto:jinfeng1...@gmail.com>>
Sent: 19 September 2023 01:01 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: About Flink parquet format

Hi Kamal

What exception did you encounter? I have tested it locally and it works fine.


Best,
Feng


On Mon, Sep 18, 2023 at 11:04 AM Kamal Mittal 
mailto:kamal.mit...@ericsson.com>> wrote:
Hello,

Checkpointing is enabled and works fine if configured parquet page size is at 
least 64 bytes as otherwise there is exception thrown at back-end.

Looks to be an issue which is not handled by file sink bulk writer?

Rgds,
Kamal

From: Feng Jin mailto:jinfeng1...@gmail.com>>
Sent: 15 September 2023 04:14 PM
To: Kamal Mittal mailto:kamal.mit...@ericsson.com>>
Cc: user@flink.apache.org
Subject: Re: About Flink parquet format

Hi Kamal

Check if the checkpoint of the task is enabled and triggered correctly. By 
default, write parquet files will roll a new file when checkpointing.


Best,
Feng

On Thu, Sep 14, 2023 at 7:27 PM Kamal Mittal via user 
mailto:user@flink.apache.org>> wrote:
Hello,

Tried parquet file creation with file sink bulk writer.

If configured parquet page size as low as 1 byte (allowed configuration) then 
flink keeps on creating multiple ‘in-progress’ state files and with content 
only as ‘PAR1’ and never closed the file.

I want to know what is the reason of not closing the file and creating multiple 
‘in-progress’ part files or why no error is given if applicable?

Rgds,
Kamal


Re: Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 Thread jinzhuguang
你好,除了这些运维手段外,flink cdc本身有什么解法吗,比如说增量阶段不用从头开始读binlog,因为其实很多都是重复读到的数据

> 2023年9月20日 21:00,Jiabao Sun  写道:
> 
> Hi,
> 生产环境的binlog还是建议至少保留7天,可以提高故障恢复时间容忍度。
> 另外,可以尝试增加snapshot的并行度和资源来提升snapshot速度,snapshot完成后可以从savepoint恢复并减少资源。
> Best,
> Jiabao
> --
> From:jinzhuguang 
> Send Time:2023年9月20日(星期三) 20:56
> To:user-zh 
> Subject:Flink cdc 2.0 历史数据太大,导致log积压怎么解决
> 以mysql 
> cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?



Re: 回复:flink1.17版本不支持hive 2.1版本了吗

2023-09-20 Thread yuxia
把这个 pr https://github.com/apache/flink/pull/19352 revert 掉,然后重新打包 flink hive 
connector 就可以。

Best regards,
Yuxia

- 原始邮件 -
发件人: "迎风浪子" <576637...@qq.com.INVALID>
收件人: "user-zh" 
发送时间: 星期二, 2023年 9 月 19日 下午 5:20:58
主题: 回复:flink1.17版本不支持hive 2.1版本了吗

我们还在使用hive1.1.0,怎么办?



---原始邮件---
发件人: "18579099920"<18579099...@163.com
发送时间: 2023年9月18日(周一) 中午11:26
收件人: "user-zh"

Zookeeper HA with Kubernetes: Possible to use the same Zookeeper cluster w/multiple Flink Operators?

2023-09-20 Thread Brian King
Hello Flink Users!

We're attempting to deploy a Flink application cluster on Kubernetes, using the 
Flink Operator and Zookeeper for HA.

We're using Flink 1.16 and I have a question about some of the Zookeeper 
configuration[0]:

"high-availability.zookeeper.path.root" is described as "The root ZooKeeper 
node, under which all cluster nodes are placed"

"high-availability.cluster-id" , which says "important: customize per cluster." 
But it also says "you should not set this value manually when running on [...] 
native Kubernetes [...]in those cases a cluster-id is [...] automatically 
generated."

Our design calls for multiple Flink application clusters managed by the same 
Flink Operator, and using the same Zookeeper quorum for each Flink Application 
cluster. Will the Flink Operator be able to handle this, or will the different 
clusters collide due to the fixed "high-availability.cluster-id" value? Is it 
possible to avoid this by setting a unique 
"high-availability.zookeeper.path.root" for each application cluster?

Thanks for your time. I'm new to Flink, so I apologize if I did not explain 
myself clearly. Please let me know if you need additional information.

Best,

Brian King
SRE, Data Platform/Search Platform
Wikimedia Foundation
IRC: inflatador

[0] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/ha/zookeeper_ha/#configuration




Re: Using Flink k8s operator on OKD

2023-09-20 Thread Krzysztof Chmielewski
Thank you Zach,
our flink-operator and flink deployments are in same namespace -> called
"flink". We have executed what is described in [1] before my initial
message.
We are using OKD 4.6.0 that according to the doc is using k8s 1.19. the
very same config is working fine on "vanilla" k8s, but for some reason it
is failing on that system where we have OKD installed.

I believe we do have proper roles/sa assigned, for example:



















*oc get saNAME SECRETS   AGEbuilder  2
6d22hdefault  2 6d22hdeployer 2 6d22hflink
   2 6d19hflink-operator   2 17hoc
get roleNAMECREATED ATflink   2023-09-13T11:53:42Zoc get
rolebindingNAME  ROLE
   AGEflink-role-bindingRole/flink
   6d19h*

[1]
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/operations/rbac/


Thanks, Krzysztof Chmielewski

śr., 20 wrz 2023 o 05:40 Zach Lorimer  napisał(a):

> I haven’t used OKD but it sounds like OLM. If that’s the case, I’m
> assuming the operator was deployed to the “operators” namespace. In that
> case, you’ll need to create the RBACs and such in the Flink namespace for
> that deployment to work.
>
> For example this needs to be in each namespace that you want to have Flink
> deployments in.
>
> kubectl apply -f - < apiVersion: v1
> kind: ServiceAccount
> metadata:
>   labels:
> app.kubernetes.io/name: flink-kubernetes-operator
> app.kubernetes.io/version: 1.5.0
>   name: flink
> ---
> apiVersion: rbac.authorization.k8s.io/v1
> kind: Role
> metadata:
>   labels:
> app.kubernetes.io/name: flink-kubernetes-operator
> app.kubernetes.io/version: 1.5.0
>   name: flink
> rules:
> - apiGroups:
>   - ""
>   resources:
>   - pods
>   - configmaps
>   verbs:
>   - '*'
> - apiGroups:
>   - apps
>   resources:
>   - deployments
>   - deployments/finalizers
>   verbs:
>   - '*'
> ---
> apiVersion: rbac.authorization.k8s.io/v1
> kind: RoleBinding
> metadata:
>   labels:
> app.kubernetes.io/name: flink-kubernetes-operator
> app.kubernetes.io/version: 1.5.0
>   name: flink-role-binding
> roleRef:
>   apiGroup: rbac.authorization.k8s.io
>   kind: Role
>   name: flink
> subjects:
> - kind: ServiceAccount
>   name: flink
> EOF
>
> Hopefully that helps.
>
>
> On Tue, Sep 19, 2023 at 5:40 PM Krzysztof Chmielewski <
> krzysiek.chmielew...@gmail.com> wrote:
>
>> Hi community,
>> I was wondering if anyone tried to deploy Flink using Flink k8s operator
>> on machine where OKD [1] is installed?
>>
>> We have tried to install Flink k8s operator version 1.6 which seems to
>> succeed, however when we try to deploy simple Flink deployment we are
>> getting an error.
>>
>> 2023-09-19 10:11:36,440 i.j.o.p.e.ReconciliationDispatcher
>> [ERROR][flink/test] Error during event processing ExecutionScope{ resource
>> id: ResourceID{name='test', namespace='flink'}, version: 684949788} failed.
>>
>> io.fabric8.kubernetes.client.KubernetesClientException: Failure
>> executing: PUT at:
>> https://172.30.0.1:443/apis/flink.apache.org/v1beta1/namespaces/flink/flinkdeployments/test.
>> Message: FlinkDeployment.flink.apache.org "test" is invalid:
>> [spec.ingress: Invalid value: "null": spec.ingress in body must be of type
>> object: "null", spec.mode: Invalid value: "null": spec.mode in body must be
>> of type string: "null", spec.mode: Unsupported value: "null": supported
>> values: "native", "standalone", spec.logConfiguration: Invalid value:
>> "null": spec.logConfiguration in body must be of type object: "null",
>> spec.imagePullPolicy: Invalid value: "null": spec.imagePullPolicy in body
>> must be of type string: "null", spec.jobManager.podTemplate: Invalid value:
>> "null": spec.jobManager.podTemplate in body must be of type object: "null",
>> spec.jobManager.resource.ephemeralStorage: Invalid value: "null":
>> spec.jobManager.resource.ephemeralStorage in body must be of type string:
>> "null", spec.podTemplate: Invalid value: "null": spec.podTemplate in body
>> must be of type object: "null", spec.restartNonce: Invalid value: "null":
>> spec.restartNonce in body must be of type integer: "null",
>> spec.taskManager.replicas: Invalid value: "null": spec.taskManager.replicas
>> in body must be of type integer: "null",
>> spec.taskManager.resource.ephemeralStorage: Invalid value: "null":
>> spec.taskManager.resource.ephemeralStorage in body must be of type string:
>> "null", spec.taskManager.podTemplate: Invalid value: "null":
>> spec.taskManager.podTemplate in body must be of type object: "null",
>> spec.job: Invalid value: "null": spec.job in body must be of type object:
>> "null", .spec.taskManager.replicas: Invalid value: 0:
>> .spec.taskManager.replicas accessor error:  is of the type ,
>> expected int64]. Received status: Status(apiVersion=v1, code=422,
>> details=StatusDetails(causes=[StatusCause(field=spec.ingress,
>> 

Test message

2023-09-20 Thread Krzysztof Chmielewski
Community,
please forgive me for this message. This is a test, because all day, my
replays to my other user thread are being rejected by email server.

Sincerely apologies
Krzysztof


Extract response stream out of a AsyncSinkBase operator

2023-09-20 Thread Bhupendra Yadav
Hey Everyone,
We have a use case where we want to extract a response
out of a AsyncSink Operator(HTTP in our case) and perform more
transformation
on top of it.

We implemented a HttpSink by following this
blog https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/
.

Since By design sink operators won't return back any response stream, we
have built
our own custom writerOperator(HttpSinkWriteOperator) by extending
 AbstractStreamOperator
and emitting the HTTP response into the output stream.

The HTTP operator works fine when used as a sink but when we try to extract
response from it
and write it to S3 using(stream.sinkTo(S3Sink)) we are seeing the
job checkpoints are taking too long and eventually fail after(5min timeout
configured).

We are trying to use AsyncSinkBase because of it's built-in capability of
*adaptive rate limiting* and retries as opposed to AsyncRichFunction.

Is it fundamentally the correct thing to do? Is it possible to extract the
response
out of an Operator implemented using AsyncSinkBase?


Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 Thread jinzhuguang
以mysql 
cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?

Re: About Flink parquet format

2023-09-20 Thread Feng Jin
Hi

I tested it on my side and also got the same error. This should be a
limitation of Parquet.

```
java.lang.IllegalArgumentException: maxCapacityHint can't be less than
initialSlabSize 64 1
at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:57)
~[flink-sql-parquet-1.17.1.jar:1.17.1]
at org.apache.parquet.bytes.CapacityByteArrayOutputStream.(
CapacityByteArrayOutputStream.java:153) ~[flink-sql-parquet-1.17.1.jar:1.17.
1]
at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder
.(RunLengthBitPackingHybridEncoder.jav
```


So I think the current minimum page size that can be set for parquet is 64B.

Best,
Feng


On Tue, Sep 19, 2023 at 6:06 PM Kamal Mittal 
wrote:

> Hello,
>
>
>
> If given page size as 1 byte then encountered exception as  -
> ‘maxCapacityHint can't be less than initialSlabSize %d %d’.
>
>
>
> This is coming from class CapacityByteArrayOutputStream and contained in
> parquet-common library.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Feng Jin 
> *Sent:* 19 September 2023 01:01 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi Kamal
>
>
>
> What exception did you encounter? I have tested it locally and it works
> fine.
>
>
>
>
>
> Best,
>
> Feng
>
>
>
>
>
> On Mon, Sep 18, 2023 at 11:04 AM Kamal Mittal 
> wrote:
>
> Hello,
>
>
>
> Checkpointing is enabled and works fine if configured parquet page size is
> at least 64 bytes as otherwise there is exception thrown at back-end.
>
>
>
> Looks to be an issue which is not handled by file sink bulk writer?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Feng Jin 
> *Sent:* 15 September 2023 04:14 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi Kamal
>
>
>
> Check if the checkpoint of the task is enabled and triggered correctly. By
> default, write parquet files will roll a new file when checkpointing.
>
>
>
>
>
> Best,
>
> Feng
>
>
>
> On Thu, Sep 14, 2023 at 7:27 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> Tried parquet file creation with file sink bulk writer.
>
>
>
> If configured parquet page size as low as 1 byte (allowed configuration)
> then flink keeps on creating multiple ‘in-progress’ state files and with
> content only as ‘PAR1’ and never closed the file.
>
>
>
> I want to know what is the reason of not closing the file and creating
> multiple ‘in-progress’ part files or why no error is given if applicable?
>
>
>
> Rgds,
>
> Kamal
>
>


Re: Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 Thread Jiabao Sun
Hi,
生产环境的binlog还是建议至少保留7天,可以提高故障恢复时间容忍度。
另外,可以尝试增加snapshot的并行度和资源来提升snapshot速度,snapshot完成后可以从savepoint恢复并减少资源。
Best,
Jiabao
--
From:jinzhuguang 
Send Time:2023年9月20日(星期三) 20:56
To:user-zh 
Subject:Flink cdc 2.0 历史数据太大,导致log积压怎么解决
以mysql 
cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?


Flink cdc 2.0 历史数据太大,导致log积压怎么解决

2023-09-20 Thread jinzhuguang
以mysql 
cdc为例,现在的f整体流程是先同步全量数据,再开启增量同步;我看代码目前增量的初始offset选择的是所有全量split的最小的highwatermark。那我如果全量数据很大,TB级别,全量同步可能需要很久,但是binlog又不能删除,这样堆积起来会占用很大的空间,不知道这个问题现在有什么常见的解法吗?